This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a2a41b7  [SPARK-26978][CORE][SQL] Avoid magic time constants
a2a41b7 is described below

commit a2a41b7bf2bfdcd1cff242013716ac7bd84bdacd
Author: Maxim Gekk <max.g...@gmail.com>
AuthorDate: Tue Feb 26 09:08:12 2019 -0600

    [SPARK-26978][CORE][SQL] Avoid magic time constants
    
    ## What changes were proposed in this pull request?
    
    In the PR, I propose to refactor existing code related to date/time 
conversions, and replace constants like `1000` and `1000000` by `DateTimeUtils` 
constants and transformation functions from `java.util.concurrent.TimeUnit._`.
    
    ## How was this patch tested?
    
    The changes are tested by existing test suites.
    
    Closes #23878 from MaxGekk/magic-time-constants.
    
    Lead-authored-by: Maxim Gekk <max.g...@gmail.com>
    Co-authored-by: Maxim Gekk <maxim.g...@databricks.com>
    Signed-off-by: Sean Owen <sean.o...@databricks.com>
---
 .../org/apache/spark/BarrierTaskContext.scala      | 12 +--
 .../org/apache/spark/deploy/master/Master.scala    |  5 +-
 .../org/apache/spark/benchmark/Benchmark.scala     |  4 +-
 .../spark/sql/catalyst/expressions/Cast.scala      | 34 ++++----
 .../expressions/codegen/CodeGenerator.scala        |  6 +-
 .../catalyst/expressions/datetimeExpressions.scala |  8 +-
 .../spark/sql/catalyst/expressions/hash.scala      |  6 +-
 .../sql/catalyst/optimizer/finishAnalysis.scala    |  6 +-
 .../plans/logical/EventTimeWatermark.scala         |  4 +-
 .../catalyst/rules/QueryExecutionMetering.scala    |  4 +-
 .../spark/sql/catalyst/util/DateTimeUtils.scala    | 90 ++++++++++++----------
 .../apache/spark/sql/catalyst/util/package.scala   | 20 -----
 .../spark/sql/catalyst/expressions/CastSuite.scala | 25 +++---
 .../expressions/DateExpressionsSuite.scala         | 17 ++--
 .../scala/org/apache/spark/sql/SparkSession.scala  |  3 +-
 .../spark/sql/execution/DataSourceScanExec.scala   | 10 ++-
 .../org/apache/spark/sql/execution/SortExec.scala  |  7 +-
 .../execution/aggregate/HashAggregateExec.scala    |  9 ++-
 .../aggregate/ObjectHashAggregateExec.scala        |  4 +-
 .../sql/execution/basicPhysicalOperators.scala     |  4 +-
 .../apache/spark/sql/execution/command/ddl.scala   |  3 +-
 .../execution/exchange/BroadcastExchangeExec.scala |  6 +-
 .../sql/execution/joins/ShuffledHashJoinExec.scala |  4 +-
 .../streaming/EventTimeWatermarkExec.scala         |  3 +-
 .../sql/execution/streaming/FileStreamSource.scala |  5 +-
 .../sql/execution/streaming/GroupStateImpl.scala   |  3 +-
 .../sql/execution/streaming/ProgressReporter.scala |  8 +-
 .../streaming/continuous/ContinuousTrigger.scala   |  2 +-
 .../spark/sql/streaming/ProcessingTime.scala       |  2 +-
 .../org/apache/spark/sql/DateFunctionsSuite.scala  | 47 ++++++-----
 .../streaming/sources/TextSocketStreamSuite.scala  |  4 +-
 .../sql/streaming/EventTimeWatermarkSuite.scala    |  9 ++-
 .../org/apache/spark/sql/hive/HiveInspectors.scala |  5 +-
 .../spark/sql/hive/client/HiveClientImpl.scala     |  9 ++-
 .../apache/spark/sql/hive/client/HiveShim.scala    |  2 +-
 .../streaming/util/RateLimitedOutputStream.scala   |  2 +-
 36 files changed, 210 insertions(+), 182 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala 
b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
index 6a497af..2d842b9 100644
--- a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
@@ -109,8 +109,8 @@ class BarrierTaskContext private[spark] (
       override def run(): Unit = {
         logInfo(s"Task $taskAttemptId from Stage $stageId(Attempt 
$stageAttemptNumber) waiting " +
           s"under the global sync since $startTime, has been waiting for " +
-          s"${(System.currentTimeMillis() - startTime) / 1000} seconds, 
current barrier epoch " +
-          s"is $barrierEpoch.")
+          s"${MILLISECONDS.toSeconds(System.currentTimeMillis() - startTime)} 
seconds, " +
+          s"current barrier epoch is $barrierEpoch.")
       }
     }
     // Log the update of global sync every 60 seconds.
@@ -126,14 +126,14 @@ class BarrierTaskContext private[spark] (
       barrierEpoch += 1
       logInfo(s"Task $taskAttemptId from Stage $stageId(Attempt 
$stageAttemptNumber) finished " +
         "global sync successfully, waited for " +
-        s"${(System.currentTimeMillis() - startTime) / 1000} seconds, current 
barrier epoch is " +
-        s"$barrierEpoch.")
+        s"${MILLISECONDS.toSeconds(System.currentTimeMillis() - startTime)} 
seconds, " +
+        s"current barrier epoch is $barrierEpoch.")
     } catch {
       case e: SparkException =>
         logInfo(s"Task $taskAttemptId from Stage $stageId(Attempt 
$stageAttemptNumber) failed " +
           "to perform global sync, waited for " +
-          s"${(System.currentTimeMillis() - startTime) / 1000} seconds, 
current barrier epoch " +
-          s"is $barrierEpoch.")
+          s"${MILLISECONDS.toSeconds(System.currentTimeMillis() - startTime)} 
seconds, " +
+          s"current barrier epoch is $barrierEpoch.")
         throw e
     } finally {
       timerTask.cancel()
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index b26da8a..3dd804b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -994,9 +994,10 @@ private[deploy] class Master(
     val toRemove = workers.filter(_.lastHeartbeat < currentTime - 
workerTimeoutMs).toArray
     for (worker <- toRemove) {
       if (worker.state != WorkerState.DEAD) {
+        val workerTimeoutSecs = 
TimeUnit.MILLISECONDS.toSeconds(workerTimeoutMs)
         logWarning("Removing %s because we got no heartbeat in %d 
seconds".format(
-          worker.id, workerTimeoutMs / 1000))
-        removeWorker(worker, s"Not receiving heartbeat for ${workerTimeoutMs / 
1000} seconds")
+          worker.id, workerTimeoutSecs))
+        removeWorker(worker, s"Not receiving heartbeat for $workerTimeoutSecs 
seconds")
       } else {
         if (worker.lastHeartbeat < currentTime - ((reaperIterations + 1) * 
workerTimeoutMs)) {
           workers -= worker // we've seen this DEAD worker in the UI, etc. for 
long enough; cull it
diff --git a/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala 
b/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala
index bb389cd..df1ed28 100644
--- a/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala
+++ b/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala
@@ -148,13 +148,13 @@ private[spark] class Benchmark(
 
       if (outputPerIteration) {
         // scalastyle:off
-        println(s"Iteration $i took ${runTime / 1000} microseconds")
+        println(s"Iteration $i took ${NANOSECONDS.toMicros(runTime)} 
microseconds")
         // scalastyle:on
       }
       i += 1
     }
     // scalastyle:off
-    println(s"  Stopped after $i iterations, ${runTimes.sum / 1000000} ms")
+    println(s"  Stopped after $i iterations, 
${NANOSECONDS.toMillis(runTimes.sum)} ms")
     // scalastyle:on
     val best = runTimes.min
     val avg = runTimes.sum / runTimes.size
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
index b20249f..d591c58 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.catalyst.expressions
 
 import java.math.{BigDecimal => JavaBigDecimal}
+import java.util.concurrent.TimeUnit._
 
 import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.InternalRow
@@ -25,6 +26,7 @@ import 
org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, TypeCoercion}
 import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.expressions.codegen.Block._
 import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils._
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
 import org.apache.spark.unsafe.types.UTF8String.{IntWrapper, LongWrapper}
@@ -374,7 +376,7 @@ case class Cast(child: Expression, dataType: DataType, 
timeZoneId: Option[String
     case ByteType =>
       buildCast[Byte](_, b => longToTimestamp(b.toLong))
     case DateType =>
-      buildCast[Int](_, d => DateTimeUtils.daysToMillis(d, timeZone) * 1000)
+      buildCast[Int](_, d => 
MILLISECONDS.toMicros(DateTimeUtils.daysToMillis(d, timeZone)))
     // TimestampWritable.decimalToTimestamp
     case DecimalType() =>
       buildCast[Decimal](_, d => decimalToTimestamp(d))
@@ -387,21 +389,21 @@ case class Cast(child: Expression, dataType: DataType, 
timeZoneId: Option[String
   }
 
   private[this] def decimalToTimestamp(d: Decimal): Long = {
-    (d.toBigDecimal * 1000000L).longValue()
+    (d.toBigDecimal * MICROS_PER_SECOND).longValue()
   }
   private[this] def doubleToTimestamp(d: Double): Any = {
-    if (d.isNaN || d.isInfinite) null else (d * 1000000L).toLong
+    if (d.isNaN || d.isInfinite) null else (d * MICROS_PER_SECOND).toLong
   }
 
   // converting seconds to us
-  private[this] def longToTimestamp(t: Long): Long = t * 1000000L
+  private[this] def longToTimestamp(t: Long): Long = SECONDS.toMicros(t)
   // converting us to seconds
   private[this] def timestampToLong(ts: Long): Long = {
-    Math.floorDiv(ts, DateTimeUtils.MICROS_PER_SECOND)
+    Math.floorDiv(ts, MICROS_PER_SECOND)
   }
   // converting us to seconds in double
   private[this] def timestampToDouble(ts: Long): Double = {
-    ts / 1000000.0
+    ts / MICROS_PER_SECOND.toDouble
   }
 
   // DateConverter
@@ -411,7 +413,7 @@ case class Cast(child: Expression, dataType: DataType, 
timeZoneId: Option[String
     case TimestampType =>
       // throw valid precision more than seconds, according to Hive.
       // Timestamp.nanos is in 0 to 999,999,999, no more than a second.
-      buildCast[Long](_, t => DateTimeUtils.millisToDays(t / 1000L, timeZone))
+      buildCast[Long](_, t => 
DateTimeUtils.millisToDays(MICROSECONDS.toMillis(t), timeZone))
   }
 
   // IntervalConverter
@@ -927,7 +929,8 @@ case class Cast(child: Expression, dataType: DataType, 
timeZoneId: Option[String
       val tz = JavaCode.global(ctx.addReferenceObj("timeZone", timeZone), 
timeZone.getClass)
       (c, evPrim, evNull) =>
         code"""$evPrim =
-          org.apache.spark.sql.catalyst.util.DateTimeUtils.millisToDays($c / 
1000L, $tz);"""
+          org.apache.spark.sql.catalyst.util.DateTimeUtils.millisToDays(
+            $c / $MICROS_PER_MILLIS, $tz);"""
     case _ =>
       (c, evPrim, evNull) => code"$evNull = true;"
   }
@@ -1034,7 +1037,8 @@ case class Cast(child: Expression, dataType: DataType, 
timeZoneId: Option[String
       val tz = JavaCode.global(ctx.addReferenceObj("timeZone", timeZone), 
timeZone.getClass)
       (c, evPrim, evNull) =>
         code"""$evPrim =
-          org.apache.spark.sql.catalyst.util.DateTimeUtils.daysToMillis($c, 
$tz) * 1000;"""
+          org.apache.spark.sql.catalyst.util.DateTimeUtils.daysToMillis(
+            $c, $tz) * $MICROS_PER_MILLIS;"""
     case DecimalType() =>
       (c, evPrim, evNull) => code"$evPrim = ${decimalToTimestampCode(c)};"
     case DoubleType =>
@@ -1043,7 +1047,7 @@ case class Cast(child: Expression, dataType: DataType, 
timeZoneId: Option[String
           if (Double.isNaN($c) || Double.isInfinite($c)) {
             $evNull = true;
           } else {
-            $evPrim = (long)($c * 1000000L);
+            $evPrim = (long)($c * $MICROS_PER_SECOND);
           }
         """
     case FloatType =>
@@ -1052,7 +1056,7 @@ case class Cast(child: Expression, dataType: DataType, 
timeZoneId: Option[String
           if (Float.isNaN($c) || Float.isInfinite($c)) {
             $evNull = true;
           } else {
-            $evPrim = (long)($c * 1000000L);
+            $evPrim = (long)($c * $MICROS_PER_SECOND);
           }
         """
   }
@@ -1069,14 +1073,14 @@ case class Cast(child: Expression, dataType: DataType, 
timeZoneId: Option[String
   }
 
   private[this] def decimalToTimestampCode(d: ExprValue): Block = {
-    val block = inline"new java.math.BigDecimal(1000000L)"
+    val block = inline"new java.math.BigDecimal($MICROS_PER_SECOND)"
     code"($d.toBigDecimal().bigDecimal().multiply($block)).longValue()"
   }
-  private[this] def longToTimeStampCode(l: ExprValue): Block = code"$l * 
1000000L"
+  private[this] def longToTimeStampCode(l: ExprValue): Block = code"$l * 
(long)$MICROS_PER_SECOND"
   private[this] def timestampToIntegerCode(ts: ExprValue): Block =
-    code"java.lang.Math.floorDiv($ts, 1000000L)"
+    code"java.lang.Math.floorDiv($ts, $MICROS_PER_SECOND)"
   private[this] def timestampToDoubleCode(ts: ExprValue): Block =
-    code"$ts / 1000000.0"
+    code"$ts / (double)$MICROS_PER_SECOND"
 
   private[this] def castToBooleanCode(from: DataType): CastFunction = from 
match {
     case StringType =>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index 7c8f7cd..b9365f0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -32,7 +32,7 @@ import org.codehaus.commons.compiler.CompileException
 import org.codehaus.janino.{ByteArrayClassLoader, ClassBodyEvaluator, 
InternalCompilerException, SimpleCompiler}
 import org.codehaus.janino.util.ClassFile
 
-import org.apache.spark.{SparkEnv, TaskContext, TaskKilledException}
+import org.apache.spark.{TaskContext, TaskKilledException}
 import org.apache.spark.executor.InputMetrics
 import org.apache.spark.internal.Logging
 import org.apache.spark.metrics.source.CodegenMetrics
@@ -40,10 +40,10 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.Block._
 import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, 
MapData}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.Platform
-import org.apache.spark.unsafe.array.ByteArrayMethods
 import org.apache.spark.unsafe.types._
 import org.apache.spark.util.{ParentClassLoader, Utils}
 
@@ -1372,7 +1372,7 @@ object CodeGenerator extends Logging {
           val startTime = System.nanoTime()
           val result = doCompile(code)
           val endTime = System.nanoTime()
-          def timeMs: Double = (endTime - startTime).toDouble / 1000000
+          def timeMs: Double = (endTime - startTime).toDouble / 
NANOS_PER_MILLIS
           CodegenMetrics.METRIC_SOURCE_CODE_SIZE.update(code.body.length)
           CodegenMetrics.METRIC_COMPILATION_TIME.update(timeMs.toLong)
           logInfo(s"Code generated in $timeMs ms")
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
index ec59502..4cb0031 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
@@ -694,7 +694,7 @@ abstract class UnixTime
             $javaType ${ev.value} = ${CodeGenerator.defaultValue(dataType)};
             if (!${ev.isNull}) {
               try {
-                ${ev.value} = $formatterName.parse(${eval1.value}.toString()) 
/ 1000000L;
+                ${ev.value} = $formatterName.parse(${eval1.value}.toString()) 
/ $MICROS_PER_SECOND;
               } catch (java.lang.IllegalArgumentException e) {
                 ${ev.isNull} = true;
               } catch (java.text.ParseException e) {
@@ -714,7 +714,7 @@ abstract class UnixTime
           s"""
             try {
               ${ev.value} = $tf$$.MODULE$$.apply($format.toString(), $tz, 
$locale)
-                .parse($string.toString()) / 1000000L;
+                .parse($string.toString()) / $MICROS_PER_SECOND;
             } catch (java.lang.IllegalArgumentException e) {
               ${ev.isNull} = true;
             } catch (java.text.ParseException e) {
@@ -733,7 +733,7 @@ abstract class UnixTime
           boolean ${ev.isNull} = ${eval1.isNull};
           $javaType ${ev.value} = ${CodeGenerator.defaultValue(dataType)};
           if (!${ev.isNull}) {
-            ${ev.value} = ${eval1.value} / 1000000L;
+            ${ev.value} = ${eval1.value} / $MICROS_PER_SECOND;
           }""")
       case DateType =>
         val tz = ctx.addReferenceObj("timeZone", timeZone)
@@ -744,7 +744,7 @@ abstract class UnixTime
           boolean ${ev.isNull} = ${eval1.isNull};
           $javaType ${ev.value} = ${CodeGenerator.defaultValue(dataType)};
           if (!${ev.isNull}) {
-            ${ev.value} = $dtu.daysToMillis(${eval1.value}, $tz) / 1000L;
+            ${ev.value} = $dtu.daysToMillis(${eval1.value}, $tz) / 
$MILLIS_PER_SECOND;
           }""")
     }
   }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
index 742a4f8..8d17b07 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions
 
 import java.math.{BigDecimal, RoundingMode}
 import java.security.{MessageDigest, NoSuchAlgorithmException}
+import java.util.concurrent.TimeUnit._
 import java.util.zip.CRC32
 
 import scala.annotation.tailrec
@@ -30,6 +31,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
 import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.expressions.codegen.Block._
 import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils._
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.Platform
 import org.apache.spark.unsafe.hash.Murmur3_x86_32
@@ -863,8 +865,8 @@ object HiveHashFunction extends InterpretedHashFunction {
    * Mimics TimestampWritable.hashCode() in Hive
    */
   def hashTimestamp(timestamp: Long): Long = {
-    val timestampInSeconds = timestamp / 1000000
-    val nanoSecondsPortion = (timestamp % 1000000) * 1000
+    val timestampInSeconds = MICROSECONDS.toSeconds(timestamp)
+    val nanoSecondsPortion = (timestamp % MICROS_PER_SECOND) * NANOS_PER_MICROS
 
     var result = timestampInSeconds
     result <<= 30 // the nanosecond part fits in 30 bits
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
index fe196ec..4094864 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
+import java.util.concurrent.TimeUnit._
+
 import scala.collection.mutable
 
 import org.apache.spark.sql.catalyst.catalog.SessionCatalog
@@ -65,7 +67,9 @@ object ComputeCurrentTime extends Rule[LogicalPlan] {
       case CurrentDate(Some(timeZoneId)) =>
         currentDates.getOrElseUpdate(timeZoneId, {
           Literal.create(
-            DateTimeUtils.millisToDays(timestamp / 1000L, 
DateTimeUtils.getTimeZone(timeZoneId)),
+            DateTimeUtils.millisToDays(
+              MICROSECONDS.toMillis(timestamp),
+              DateTimeUtils.getTimeZone(timeZoneId)),
             DateType)
         })
       case CurrentTimestamp() => currentTime
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala
index 7a927e1..8441c2c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.catalyst.plans.logical
 
+import java.util.concurrent.TimeUnit
+
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.types.MetadataBuilder
 import org.apache.spark.unsafe.types.CalendarInterval
@@ -27,7 +29,7 @@ object EventTimeWatermark {
 
   def getDelayMs(delay: CalendarInterval): Long = {
     // We define month as `31 days` to simplify calculation.
-    val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31
+    val millisPerMonth = 
TimeUnit.MICROSECONDS.toMillis(CalendarInterval.MICROS_PER_DAY) * 31
     delay.milliseconds + delay.months * millisPerMonth
   }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala
index 62f7541..e4d5fa9 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala
@@ -21,6 +21,8 @@ import scala.collection.JavaConverters._
 
 import com.google.common.util.concurrent.AtomicLongMap
 
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.NANOS_PER_SECOND
+
 case class QueryExecutionMetering() {
   private val timeMap = AtomicLongMap.create[String]()
   private val numRunsMap = AtomicLongMap.create[String]()
@@ -82,7 +84,7 @@ case class QueryExecutionMetering() {
     s"""
        |=== Metrics of Analyzer/Optimizer Rules ===
        |Total number of runs: $totalNumRuns
-       |Total time: ${totalTime / 1000000000D} seconds
+       |Total time: ${totalTime / NANOS_PER_SECOND.toDouble} seconds
        |
        |$colRuleName $colRunTime $colNumRuns
        |$ruleMetrics
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index d714d29..627670a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -22,7 +22,7 @@ import java.time._
 import java.time.Year.isLeap
 import java.time.temporal.IsoFields
 import java.util.{Locale, TimeZone}
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.TimeUnit._
 
 import scala.util.control.NonFatal
 
@@ -44,14 +44,18 @@ object DateTimeUtils {
   // see 
http://stackoverflow.com/questions/466321/convert-unix-timestamp-to-julian
   // it's 2440587.5, rounding up to compatible with Hive
   final val JULIAN_DAY_OF_EPOCH = 2440588
-  final val SECONDS_PER_DAY = 60 * 60 * 24L
-  final val MICROS_PER_MILLIS = 1000L
-  final val MICROS_PER_SECOND = MICROS_PER_MILLIS * MILLIS_PER_SECOND
-  final val MILLIS_PER_SECOND = 1000L
-  final val NANOS_PER_SECOND = MICROS_PER_SECOND * 1000L
-  final val MICROS_PER_DAY = MICROS_PER_SECOND * SECONDS_PER_DAY
-  final val NANOS_PER_MICROS = 1000L
-  final val MILLIS_PER_DAY = SECONDS_PER_DAY * 1000L
+
+  final val NANOS_PER_MICROS = MICROSECONDS.toNanos(1)
+  final val NANOS_PER_MILLIS = MILLISECONDS.toNanos(1)
+  final val NANOS_PER_SECOND = SECONDS.toNanos(1)
+  final val MICROS_PER_MILLIS = MILLISECONDS.toMicros(1)
+  final val MICROS_PER_SECOND = SECONDS.toMicros(1)
+  final val MICROS_PER_DAY = DAYS.toMicros(1)
+  final val MILLIS_PER_SECOND = SECONDS.toMillis(1)
+  final val MILLIS_PER_MINUTE = MINUTES.toMillis(1)
+  final val MILLIS_PER_HOUR = HOURS.toMillis(1)
+  final val MILLIS_PER_DAY = DAYS.toMillis(1)
+  final val SECONDS_PER_DAY = DAYS.toSeconds(1)
 
   // number of days between 1.1.1970 and 1.1.2001
   final val to2001 = -11323
@@ -133,8 +137,8 @@ object DateTimeUtils {
       micros += MICROS_PER_SECOND
       seconds -= 1
     }
-    val t = new Timestamp(seconds * 1000)
-    t.setNanos(micros.toInt * 1000)
+    val t = new Timestamp(SECONDS.toMillis(seconds))
+    t.setNanos(MICROSECONDS.toNanos(micros).toInt)
     t
   }
 
@@ -143,7 +147,7 @@ object DateTimeUtils {
    */
   def fromJavaTimestamp(t: Timestamp): SQLTimestamp = {
     if (t != null) {
-      t.getTime() * 1000L + (t.getNanos().toLong / 1000) % 1000L
+      MILLISECONDS.toMicros(t.getTime()) + NANOSECONDS.toMicros(t.getNanos()) 
% NANOS_PER_MICROS
     } else {
       0L
     }
@@ -156,7 +160,7 @@ object DateTimeUtils {
   def fromJulianDay(day: Int, nanoseconds: Long): SQLTimestamp = {
     // use Long to avoid rounding errors
     val seconds = (day - JULIAN_DAY_OF_EPOCH).toLong * SECONDS_PER_DAY
-    seconds * MICROS_PER_SECOND + nanoseconds / 1000L
+    SECONDS.toMicros(seconds) + NANOSECONDS.toMicros(nanoseconds)
   }
 
   /**
@@ -168,7 +172,7 @@ object DateTimeUtils {
     val julian_us = us + JULIAN_DAY_OF_EPOCH * MICROS_PER_DAY
     val day = julian_us / MICROS_PER_DAY
     val micros = julian_us % MICROS_PER_DAY
-    (day.toInt, micros * 1000L)
+    (day.toInt, MICROSECONDS.toNanos(micros))
   }
 
   /*
@@ -186,7 +190,7 @@ object DateTimeUtils {
    * Converts milliseconds since epoch to SQLTimestamp.
    */
   def fromMillis(millis: Long): SQLTimestamp = {
-    millis * MICROS_PER_MILLIS
+    MILLISECONDS.toMicros(millis)
   }
 
   /**
@@ -329,7 +333,7 @@ object DateTimeUtils {
         val sign = if (tz.get.toChar == '-') -1 else 1
         ZoneId.ofOffset("GMT", ZoneOffset.ofHoursMinutes(sign * segments(7), 
sign * segments(8)))
       }
-      val nanoseconds = TimeUnit.MICROSECONDS.toNanos(segments(6))
+      val nanoseconds = MICROSECONDS.toNanos(segments(6))
       val localTime = LocalTime.of(segments(3), segments(4), segments(5), 
nanoseconds.toInt)
       val localDate = if (justTime) {
         LocalDate.now(zoneId)
@@ -346,8 +350,8 @@ object DateTimeUtils {
   }
 
   def instantToMicros(instant: Instant): Long = {
-    val sec = Math.multiplyExact(instant.getEpochSecond, MICROS_PER_SECOND)
-    val result = Math.addExact(sec, instant.getNano / NANOS_PER_MICROS)
+    val us = Math.multiplyExact(instant.getEpochSecond, MICROS_PER_SECOND)
+    val result = Math.addExact(us, NANOSECONDS.toMicros(instant.getNano))
     result
   }
 
@@ -420,14 +424,15 @@ object DateTimeUtils {
   }
 
   private def localTimestamp(microsec: SQLTimestamp, timeZone: TimeZone): 
SQLTimestamp = {
-    absoluteMicroSecond(microsec) + timeZone.getOffset(microsec / 1000) * 1000L
+    val zoneOffsetUs = 
MILLISECONDS.toMicros(timeZone.getOffset(MICROSECONDS.toMillis(microsec)))
+    absoluteMicroSecond(microsec) + zoneOffsetUs
   }
 
   /**
    * Returns the hour value of a given timestamp value. The timestamp is 
expressed in microseconds.
    */
   def getHours(microsec: SQLTimestamp, timeZone: TimeZone): Int = {
-    ((localTimestamp(microsec, timeZone) / MICROS_PER_SECOND / 3600) % 
24).toInt
+    (MICROSECONDS.toHours(localTimestamp(microsec, timeZone)) % 24).toInt
   }
 
   /**
@@ -435,7 +440,7 @@ object DateTimeUtils {
    * microseconds.
    */
   def getMinutes(microsec: SQLTimestamp, timeZone: TimeZone): Int = {
-    ((localTimestamp(microsec, timeZone) / MICROS_PER_SECOND / 60) % 60).toInt
+    (MICROSECONDS.toMinutes(localTimestamp(microsec, timeZone)) % 60).toInt
   }
 
   /**
@@ -443,7 +448,7 @@ object DateTimeUtils {
    * microseconds.
    */
   def getSeconds(microsec: SQLTimestamp, timeZone: TimeZone): Int = {
-    ((localTimestamp(microsec, timeZone) / MICROS_PER_SECOND) % 60).toInt
+    (MICROSECONDS.toSeconds(localTimestamp(microsec, timeZone)) % 60).toInt
   }
 
   /**
@@ -560,10 +565,10 @@ object DateTimeUtils {
       months: Int,
       microseconds: Long,
       timeZone: TimeZone): SQLTimestamp = {
-    val days = millisToDays(start / 1000L, timeZone)
+    val days = millisToDays(MICROSECONDS.toMillis(start), timeZone)
     val newDays = dateAddMonths(days, months)
     start +
-      daysToMillis(newDays, timeZone) * 1000L - daysToMillis(days, timeZone) * 
1000L +
+      MILLISECONDS.toMicros(daysToMillis(newDays, timeZone) - 
daysToMillis(days, timeZone)) +
       microseconds
   }
 
@@ -582,8 +587,8 @@ object DateTimeUtils {
       time2: SQLTimestamp,
       roundOff: Boolean,
       timeZone: TimeZone): Double = {
-    val millis1 = time1 / 1000L
-    val millis2 = time2 / 1000L
+    val millis1 = MICROSECONDS.toMillis(time1)
+    val millis2 = MICROSECONDS.toMillis(time2)
     val date1 = millisToDays(millis1, timeZone)
     val date2 = millisToDays(millis2, timeZone)
     val (year1, monthInYear1, dayInMonth1, daysToMonthEnd1) = splitDate(date1)
@@ -599,12 +604,11 @@ object DateTimeUtils {
     }
     // using milliseconds can cause precision loss with more than 8 digits
     // we follow Hive's implementation which uses seconds
-    val secondsInDay1 = (millis1 - daysToMillis(date1, timeZone)) / 1000L
-    val secondsInDay2 = (millis2 - daysToMillis(date2, timeZone)) / 1000L
+    val secondsInDay1 = MILLISECONDS.toSeconds(millis1 - daysToMillis(date1, 
timeZone))
+    val secondsInDay2 = MILLISECONDS.toSeconds(millis2 - daysToMillis(date2, 
timeZone))
     val secondsDiff = (dayInMonth1 - dayInMonth2) * SECONDS_PER_DAY + 
secondsInDay1 - secondsInDay2
-    // 2678400D is the number of seconds in 31 days
-    // every month is considered to be 31 days long in this function
-    val diff = monthDiff + secondsDiff / 2678400D
+    val secondsInMonth = DAYS.toSeconds(31)
+    val diff = monthDiff + secondsDiff / secondsInMonth.toDouble
     if (roundOff) {
       // rounding to 8 digits
       math.round(diff * 1e8) / 1e8
@@ -688,7 +692,7 @@ object DateTimeUtils {
    * Trunc level should be generated using `parseTruncLevel()`, should be 
between 1 and 8
    */
   def truncTimestamp(t: SQLTimestamp, level: Int, timeZone: TimeZone): 
SQLTimestamp = {
-    var millis = t / MICROS_PER_MILLIS
+    var millis = MICROSECONDS.toMillis(t)
     val truncated = level match {
       case TRUNC_TO_YEAR =>
         val dDays = millisToDays(millis, timeZone)
@@ -699,13 +703,13 @@ object DateTimeUtils {
       case TRUNC_TO_DAY =>
         val offset = timeZone.getOffset(millis)
         millis += offset
-        millis - millis % (MILLIS_PER_SECOND * SECONDS_PER_DAY) - offset
+        millis - millis % MILLIS_PER_DAY - offset
       case TRUNC_TO_HOUR =>
         val offset = timeZone.getOffset(millis)
         millis += offset
-        millis - millis % (60 * 60 * MILLIS_PER_SECOND) - offset
+        millis - millis % MILLIS_PER_HOUR - offset
       case TRUNC_TO_MINUTE =>
-        millis - millis % (60 * MILLIS_PER_SECOND)
+        millis - millis % MILLIS_PER_MINUTE
       case TRUNC_TO_SECOND =>
         millis - millis % MILLIS_PER_SECOND
       case TRUNC_TO_WEEK =>
@@ -761,8 +765,8 @@ object DateTimeUtils {
       if (guess != offset) {
         // fallback to do the reverse lookup using java.time.LocalDateTime
         // this should only happen near the start or end of DST
-        val localDate = 
LocalDate.ofEpochDay(TimeUnit.MILLISECONDS.toDays(millisLocal))
-        val localTime = LocalTime.ofNanoOfDay(TimeUnit.MILLISECONDS.toNanos(
+        val localDate = LocalDate.ofEpochDay(MILLISECONDS.toDays(millisLocal))
+        val localTime = LocalTime.ofNanoOfDay(MILLISECONDS.toNanos(
           Math.floorMod(millisLocal, MILLIS_PER_DAY)))
         val localDateTime = LocalDateTime.of(localDate, localTime)
         val millisEpoch = 
localDateTime.atZone(tz.toZoneId).toInstant.toEpochMilli
@@ -787,15 +791,19 @@ object DateTimeUtils {
       ts
     } else {
       // get the human time using local time zone, that actually is in 
fromZone.
-      val localTs = ts + localZone.getOffset(ts / 1000L) * 1000L  // in 
fromZone
-      localTs - getOffsetFromLocalMillis(localTs / 1000L, fromZone) * 1000L
+      val localZoneOffsetMs = localZone.getOffset(MICROSECONDS.toMillis(ts))
+      val localTsUs = ts + MILLISECONDS.toMicros(localZoneOffsetMs)  // in 
fromZone
+      val offsetFromLocalMs = 
getOffsetFromLocalMillis(MICROSECONDS.toMillis(localTsUs), fromZone)
+      localTsUs - MILLISECONDS.toMicros(offsetFromLocalMs)
     }
     if (toZone.getID == localZone.getID) {
       utcTs
     } else {
-      val localTs = utcTs + toZone.getOffset(utcTs / 1000L) * 1000L  // in 
toZone
+      val toZoneOffsetMs = toZone.getOffset(MICROSECONDS.toMillis(utcTs))
+      val localTsUs = utcTs + MILLISECONDS.toMicros(toZoneOffsetMs)  // in 
toZone
       // treat it as local timezone, convert to UTC (we could get the expected 
human time back)
-      localTs - getOffsetFromLocalMillis(localTs / 1000L, localZone) * 1000L
+      val offsetFromLocalMs = 
getOffsetFromLocalMillis(MICROSECONDS.toMillis(localTsUs), localZone)
+      localTsUs - MILLISECONDS.toMicros(offsetFromLocalMs)
     }
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
index 7f5860e..a5dbc75 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
@@ -125,18 +125,6 @@ package object util extends Logging {
     new String(out.toByteArray, StandardCharsets.UTF_8)
   }
 
-  def stringOrNull(a: AnyRef): String = if (a == null) null else a.toString
-
-  def benchmark[A](f: => A): A = {
-    val startTime = System.nanoTime()
-    val ret = f
-    val endTime = System.nanoTime()
-    // scalastyle:off println
-    println(s"${(endTime - startTime).toDouble / 1000000}ms")
-    // scalastyle:on println
-    ret
-  }
-
   // Replaces attributes, string literals, complex type extractors with their 
pretty form so that
   // generated column names don't contain back-ticks or double-quotes.
   def usePrettyExpression(e: Expression): Expression = e transform {
@@ -158,7 +146,6 @@ package object util extends Logging {
 
   def toPrettySQL(e: Expression): String = usePrettyExpression(e).sql
 
-
   def escapeSingleQuotedString(str: String): String = {
     val builder = StringBuilder.newBuilder
 
@@ -203,11 +190,4 @@ package object util extends Logging {
   def truncatedString[T](seq: Seq[T], sep: String, maxFields: Int): String = {
     truncatedString(seq, "", sep, "", maxFields)
   }
-
-  /* FIX ME
-  implicit class debugLogging(a: Any) {
-    def debugLogging() {
-      
org.apache.log4j.Logger.getLogger(a.getClass.getName).setLevel(org.apache.log4j.Level.DEBUG)
-    }
-  } */
 }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala
index 11956e1..d812504 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala
@@ -18,9 +18,8 @@
 package org.apache.spark.sql.catalyst.expressions
 
 import java.sql.{Date, Timestamp}
-import java.util.{Calendar, Locale, TimeZone}
-
-import scala.util.Random
+import java.util.{Calendar, TimeZone}
+import java.util.concurrent.TimeUnit._
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.Row
@@ -29,7 +28,7 @@ import 
org.apache.spark.sql.catalyst.analysis.TypeCoercion.numericPrecedence
 import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
 import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.catalyst.util.DateTimeUtils.TimeZoneGMT
+import org.apache.spark.sql.catalyst.util.DateTimeUtils._
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -321,13 +320,13 @@ class CastSuite extends SparkFunSuite with 
ExpressionEvalHelper {
       checkEvaluation(
         cast(cast(new Timestamp(c.getTimeInMillis), StringType, timeZoneId),
           TimestampType, timeZoneId),
-        c.getTimeInMillis * 1000)
+        MILLISECONDS.toMicros(c.getTimeInMillis))
       c = Calendar.getInstance(TimeZoneGMT)
       c.set(2015, 10, 1, 2, 30, 0)
       checkEvaluation(
         cast(cast(new Timestamp(c.getTimeInMillis), StringType, timeZoneId),
           TimestampType, timeZoneId),
-        c.getTimeInMillis * 1000)
+        MILLISECONDS.toMicros(c.getTimeInMillis))
     }
 
     val gmtId = Option("GMT")
@@ -522,17 +521,17 @@ class CastSuite extends SparkFunSuite with 
ExpressionEvalHelper {
     checkEvaluation(cast(ts, FloatType), 15.003f)
     checkEvaluation(cast(ts, DoubleType), 15.003)
     checkEvaluation(cast(cast(tss, ShortType), TimestampType),
-      DateTimeUtils.fromJavaTimestamp(ts) * 1000)
+      DateTimeUtils.fromJavaTimestamp(ts) * MILLIS_PER_SECOND)
     checkEvaluation(cast(cast(tss, IntegerType), TimestampType),
-      DateTimeUtils.fromJavaTimestamp(ts) * 1000)
+      DateTimeUtils.fromJavaTimestamp(ts) * MILLIS_PER_SECOND)
     checkEvaluation(cast(cast(tss, LongType), TimestampType),
-      DateTimeUtils.fromJavaTimestamp(ts) * 1000)
+      DateTimeUtils.fromJavaTimestamp(ts) * MILLIS_PER_SECOND)
     checkEvaluation(
-      cast(cast(millis.toFloat / 1000, TimestampType), FloatType),
-      millis.toFloat / 1000)
+      cast(cast(millis.toFloat / MILLIS_PER_SECOND, TimestampType), FloatType),
+      millis.toFloat / MILLIS_PER_SECOND)
     checkEvaluation(
-      cast(cast(millis.toDouble / 1000, TimestampType), DoubleType),
-      millis.toDouble / 1000)
+      cast(cast(millis.toDouble / MILLIS_PER_SECOND, TimestampType), 
DoubleType),
+      millis.toDouble / MILLIS_PER_SECOND)
     checkEvaluation(
       cast(cast(Decimal(1), TimestampType), DecimalType.SYSTEM_DEFAULT),
       Decimal(1))
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
index ce576ec..8bec32d 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
@@ -21,6 +21,7 @@ import java.sql.{Date, Timestamp}
 import java.text.SimpleDateFormat
 import java.util.{Calendar, Locale, TimeZone}
 import java.util.concurrent.TimeUnit
+import java.util.concurrent.TimeUnit._
 
 import org.apache.spark.SparkFunSuite
 import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
@@ -705,14 +706,14 @@ class DateExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
           1000L)
         checkEvaluation(
           UnixTimestamp(Literal(date1), Literal("yyyy-MM-dd HH:mm:ss"), 
timeZoneId),
-          DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz) / 
1000L)
+          
MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1),
 tz)))
         checkEvaluation(
           UnixTimestamp(Literal(sdf2.format(new Timestamp(-1000000))), 
Literal(fmt2), timeZoneId),
           -1000L)
         checkEvaluation(UnixTimestamp(
           Literal(sdf3.format(Date.valueOf("2015-07-24"))), Literal(fmt3), 
timeZoneId),
-          DateTimeUtils.daysToMillis(
-            DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-24")), tz) / 
1000L)
+          MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis(
+            DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-24")), tz)))
         val t1 = UnixTimestamp(
           CurrentTimestamp(), Literal("yyyy-MM-dd 
HH:mm:ss")).eval().asInstanceOf[Long]
         val t2 = UnixTimestamp(
@@ -727,7 +728,7 @@ class DateExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
           null)
         checkEvaluation(
           UnixTimestamp(Literal(date1), Literal.create(null, StringType), 
timeZoneId),
-          DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz) / 
1000L)
+          
MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1),
 tz)))
         checkEvaluation(
           UnixTimestamp(Literal("2015-07-24"), Literal("not a valid format"), 
timeZoneId), null)
       }
@@ -759,14 +760,14 @@ class DateExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
           1000L)
         checkEvaluation(
           ToUnixTimestamp(Literal(date1), Literal("yyyy-MM-dd HH:mm:ss"), 
timeZoneId),
-          DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz) / 
1000L)
+          
MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1),
 tz)))
         checkEvaluation(
           ToUnixTimestamp(Literal(sdf2.format(new Timestamp(-1000000))), 
Literal(fmt2), timeZoneId),
           -1000L)
         checkEvaluation(ToUnixTimestamp(
           Literal(sdf3.format(Date.valueOf("2015-07-24"))), Literal(fmt3), 
timeZoneId),
-          DateTimeUtils.daysToMillis(
-            DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-24")), tz) / 
1000L)
+          MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis(
+            DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-24")), tz)))
         val t1 = ToUnixTimestamp(
           CurrentTimestamp(), Literal("yyyy-MM-dd 
HH:mm:ss")).eval().asInstanceOf[Long]
         val t2 = ToUnixTimestamp(
@@ -780,7 +781,7 @@ class DateExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
           null)
         checkEvaluation(ToUnixTimestamp(
           Literal(date1), Literal.create(null, StringType), timeZoneId),
-          DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz) / 
1000L)
+          
MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1),
 tz)))
         checkEvaluation(
           ToUnixTimestamp(Literal("2015-07-24"), Literal("not a valid 
format"), timeZoneId), null)
       }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index a7bd2ef..f6fab76 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql
 
 import java.io.Closeable
+import java.util.concurrent.TimeUnit._
 import java.util.concurrent.atomic.AtomicReference
 
 import scala.collection.JavaConverters._
@@ -690,7 +691,7 @@ class SparkSession private(
     val ret = f
     val end = System.nanoTime()
     // scalastyle:off println
-    println(s"Time taken: ${(end - start) / 1000 / 1000} ms")
+    println(s"Time taken: ${NANOSECONDS.toMillis(end - start)} ms")
     // scalastyle:on println
     ret
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index f852a52..3aed2ce 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -17,17 +17,18 @@
 
 package org.apache.spark.sql.execution
 
-import scala.collection.mutable.{ArrayBuffer, HashMap}
+import java.util.concurrent.TimeUnit._
+
+import scala.collection.mutable.HashMap
 
 import org.apache.commons.lang3.StringUtils
-import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, 
Path}
+import org.apache.hadoop.fs.Path
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning, UnknownPartitioning}
 import org.apache.spark.sql.catalyst.util.truncatedString
@@ -180,7 +181,8 @@ case class FileSourceScanExec(
     val startTime = System.nanoTime()
     val ret = relation.location.listFiles(partitionFilters, dataFilters)
     driverMetrics("numFiles") = ret.map(_.files.size.toLong).sum
-    val timeTakenMs = ((System.nanoTime() - startTime) + 
optimizerMetadataTimeNs) / 1000 / 1000
+    val timeTakenMs = NANOSECONDS.toMillis(
+      (System.nanoTime() - startTime) + optimizerMetadataTimeNs)
     driverMetrics("metadataTime") = timeTakenMs
     ret
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
index f1470e4..0a955d6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution
 
+import java.util.concurrent.TimeUnit._
+
 import org.apache.spark.{SparkEnv, TaskContext}
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.rdd.RDD
@@ -24,6 +26,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
CodeGenerator, ExprCode}
 import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils._
 import org.apache.spark.sql.execution.metric.SQLMetrics
 
 /**
@@ -106,7 +109,7 @@ case class SortExec(
       // figure out how many bytes we spilled for this operator.
       val spillSizeBefore = metrics.memoryBytesSpilled
       val sortedIterator = sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]])
-      sortTime += sorter.getSortTimeNanos / 1000000
+      sortTime += NANOSECONDS.toMillis(sorter.getSortTimeNanos)
       peakMemory += sorter.getPeakMemoryUsage
       spillSize += metrics.memoryBytesSpilled - spillSizeBefore
       metrics.incPeakExecutionMemory(sorter.getPeakMemoryUsage)
@@ -157,7 +160,7 @@ case class SortExec(
        |   long $spillSizeBefore = $metrics.memoryBytesSpilled();
        |   $addToSorterFuncName();
        |   $sortedIterator = $sorterVariable.sort();
-       |   $sortTime.add($sorterVariable.getSortTimeNanos() / 1000000);
+       |   $sortTime.add($sorterVariable.getSortTimeNanos() / 
$NANOS_PER_MILLIS);
        |   $peakMemory.add($sorterVariable.getPeakMemoryUsage());
        |   $spillSize.add($metrics.memoryBytesSpilled() - $spillSizeBefore);
        |   
$metrics.incPeakExecutionMemory($sorterVariable.getPeakMemoryUsage());
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
index 23ae1f0..25ff658 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.aggregate
 
+import java.util.concurrent.TimeUnit._
+
 import org.apache.spark.TaskContext
 import org.apache.spark.memory.{SparkOutOfMemoryError, TaskMemoryManager}
 import org.apache.spark.rdd.RDD
@@ -28,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.expressions.codegen.Block._
 import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils._
 import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
@@ -135,7 +138,7 @@ case class HashAggregateExec(
           aggregationIterator
         }
       }
-      aggTime += (System.nanoTime() - beforeAgg) / 1000000
+      aggTime += NANOSECONDS.toMillis(System.nanoTime() - beforeAgg)
       res
     }
   }
@@ -240,7 +243,7 @@ case class HashAggregateExec(
        |   $initAgg = true;
        |   long $beforeAgg = System.nanoTime();
        |   $doAggFuncName();
-       |   $aggTime.add((System.nanoTime() - $beforeAgg) / 1000000);
+       |   $aggTime.add((System.nanoTime() - $beforeAgg) / $NANOS_PER_MILLIS);
        |
        |   // output the result
        |   ${genResult.trim}
@@ -726,7 +729,7 @@ case class HashAggregateExec(
        $initAgg = true;
        long $beforeAgg = System.nanoTime();
        $doAggFuncName();
-       $aggTime.add((System.nanoTime() - $beforeAgg) / 1000000);
+       $aggTime.add((System.nanoTime() - $beforeAgg) / $NANOS_PER_MILLIS);
      }
 
      // output the result
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala
index 5b340ee..151da24 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.aggregate
 
+import java.util.concurrent.TimeUnit._
+
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.errors._
@@ -132,7 +134,7 @@ case class ObjectHashAggregateExec(
           aggregationIterator
         }
       }
-      aggTime += (System.nanoTime() - beforeAgg) / 1000000
+      aggTime += NANOSECONDS.toMillis(System.nanoTime() - beforeAgg)
       res
     }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index 4352721..eacd35b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution
 
+import java.util.concurrent.TimeUnit._
+
 import scala.concurrent.{ExecutionContext, Future}
 import scala.concurrent.duration.Duration
 
@@ -684,7 +686,7 @@ case class SubqueryExec(name: String, child: SparkPlan) 
extends UnaryExecNode {
         // Note that we use .executeCollect() because we don't want to convert 
data to Scala types
         val rows: Array[InternalRow] = child.executeCollect()
         val beforeBuild = System.nanoTime()
-        longMetric("collectTime") += (beforeBuild - beforeCollect) / 1000000
+        longMetric("collectTime") += NANOSECONDS.toMillis(beforeBuild - 
beforeCollect)
         val dataSize = 
rows.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum
         longMetric("dataSize") += dataSize
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 096481f..bcd8908 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution.command
 
 import java.util.Locale
+import java.util.concurrent.TimeUnit._
 
 import scala.collection.{GenMap, GenSeq}
 import scala.collection.parallel.ForkJoinTaskSupport
@@ -739,7 +740,7 @@ case class AlterTableRecoverPartitionsCommand(
     // do this in parallel.
     val batchSize = 100
     partitionSpecsAndLocs.toIterator.grouped(batchSize).foreach { batch =>
-      val now = System.currentTimeMillis() / 1000
+      val now = MILLISECONDS.toSeconds(System.currentTimeMillis())
       val parts = batch.map { case (spec, location) =>
         val params = partitionStats.get(location.toString).map {
           case PartitionStatistics(numFiles, totalSize) =>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
index d55d4fa..b9972b8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
@@ -83,7 +83,7 @@ case class BroadcastExchangeExec(
           }
 
           val beforeBuild = System.nanoTime()
-          longMetric("collectTime") += (beforeBuild - beforeCollect) / 1000000
+          longMetric("collectTime") += NANOSECONDS.toMillis(beforeBuild - 
beforeCollect)
 
           // Construct the relation.
           val relation = mode.transform(input, Some(numRows))
@@ -105,11 +105,11 @@ case class BroadcastExchangeExec(
           }
 
           val beforeBroadcast = System.nanoTime()
-          longMetric("buildTime") += (beforeBroadcast - beforeBuild) / 1000000
+          longMetric("buildTime") += NANOSECONDS.toMillis(beforeBroadcast - 
beforeBuild)
 
           // Broadcast the relation
           val broadcasted = sparkContext.broadcast(relation)
-          longMetric("broadcastTime") += (System.nanoTime() - beforeBroadcast) 
/ 1000000
+          longMetric("broadcastTime") += 
NANOSECONDS.toMillis(System.nanoTime() - beforeBroadcast)
 
           SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, 
metrics.values.toSeq)
           broadcasted
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
index 524804d..a8361fd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.joins
 
+import java.util.concurrent.TimeUnit._
+
 import org.apache.spark.TaskContext
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
@@ -53,7 +55,7 @@ case class ShuffledHashJoinExec(
     val start = System.nanoTime()
     val context = TaskContext.get()
     val relation = HashedRelation(iter, buildKeys, taskMemoryManager = 
context.taskMemoryManager())
-    buildTime += (System.nanoTime() - start) / 1000000
+    buildTime += NANOSECONDS.toMillis(System.nanoTime() - start)
     buildDataSize += relation.estimatedSize
     // This relation is usually used until the end of task.
     context.addTaskCompletionListener[Unit](_ => relation.close())
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
index 6fa7ee0..6d1131e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
@@ -21,6 +21,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection}
 import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
+import org.apache.spark.sql.catalyst.util.DateTimeUtils._
 import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
 import org.apache.spark.sql.types.MetadataBuilder
 import org.apache.spark.unsafe.types.CalendarInterval
@@ -99,7 +100,7 @@ case class EventTimeWatermarkExec(
     child.execute().mapPartitions { iter =>
       val getEventTime = UnsafeProjection.create(eventTime :: Nil, 
child.output)
       iter.map { row =>
-        eventTimeStats.add(getEventTime(row).getLong(0) / 1000)
+        eventTimeStats.add(getEventTime(row).getLong(0) / MICROS_PER_MILLIS)
         row
       }
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 43b70ae..cef814b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -18,8 +18,7 @@
 package org.apache.spark.sql.execution.streaming
 
 import java.net.URI
-
-import scala.collection.JavaConverters._
+import java.util.concurrent.TimeUnit._
 
 import org.apache.hadoop.fs.{FileStatus, Path}
 
@@ -237,7 +236,7 @@ class FileStreamSource(
       (status.getPath.toUri.toString, status.getModificationTime)
     }
     val endTime = System.nanoTime
-    val listingTimeMs = (endTime.toDouble - startTime) / 1000000
+    val listingTimeMs = NANOSECONDS.toMillis(endTime - startTime)
     if (listingTimeMs > 2000) {
       // Output a warning when listing files uses more than 2 seconds.
       logWarning(s"Listed ${files.size} file(s) in $listingTimeMs ms")
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala
index 7f65e3e..fcb230b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution.streaming
 
 import java.sql.Date
+import java.util.concurrent.TimeUnit
 
 import org.apache.commons.lang3.StringUtils
 
@@ -178,7 +179,7 @@ private[sql] class GroupStateImpl[S] private(
       throw new IllegalArgumentException(s"Provided duration ($duration) is 
not positive")
     }
 
-    val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31
+    val millisPerMonth = 
TimeUnit.MICROSECONDS.toMillis(CalendarInterval.MICROS_PER_DAY) * 31
     cal.milliseconds + cal.months * millisPerMonth
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 2528351..859c327 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -26,7 +26,7 @@ import scala.collection.mutable
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, 
LogicalPlan}
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.catalyst.util.DateTimeUtils._
 import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.execution.datasources.v2.{MicroBatchScanExec, 
StreamingDataSourceV2Relation, StreamWriterCommitProgress}
 import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchStream
@@ -87,7 +87,7 @@ trait ProgressReporter extends Logging {
   private var lastNoDataProgressEventTime = Long.MinValue
 
   private val timestampFormat = new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
-  timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC"))
+  timestampFormat.setTimeZone(getTimeZone("UTC"))
 
   @volatile
   protected var currentStatus: StreamingQueryStatus = {
@@ -147,10 +147,10 @@ trait ProgressReporter extends Logging {
 
     val executionStats = extractExecutionStats(hasNewData)
     val processingTimeSec =
-      (currentTriggerEndTimestamp - currentTriggerStartTimestamp).toDouble / 
1000
+      (currentTriggerEndTimestamp - currentTriggerStartTimestamp).toDouble / 
MILLIS_PER_SECOND
 
     val inputTimeSec = if (lastTriggerStartTimestamp >= 0) {
-      (currentTriggerStartTimestamp - lastTriggerStartTimestamp).toDouble / 
1000
+      (currentTriggerStartTimestamp - lastTriggerStartTimestamp).toDouble / 
MILLIS_PER_SECOND
     } else {
       Double.NaN
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTrigger.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTrigger.scala
index caffcc3..fd0ff31 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTrigger.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTrigger.scala
@@ -53,7 +53,7 @@ private[sql] object ContinuousTrigger {
     if (cal.months > 0) {
       throw new IllegalArgumentException(s"Doesn't support month or year 
interval: $interval")
     }
-    new ContinuousTrigger(cal.microseconds / 1000)
+    new ContinuousTrigger(TimeUnit.MICROSECONDS.toMillis(cal.microseconds))
   }
 
   def apply(interval: Duration): ContinuousTrigger = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala
index 236bd55..38b0776 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala
@@ -91,7 +91,7 @@ object ProcessingTime {
     if (cal.months > 0) {
       throw new IllegalArgumentException(s"Doesn't support month or year 
interval: $interval")
     }
-    new ProcessingTime(cal.microseconds / 1000)
+    new ProcessingTime(TimeUnit.MICROSECONDS.toMillis(cal.microseconds))
   }
 
   /**
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
index 62bb72d..b06d52d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql
 import java.sql.{Date, Timestamp}
 import java.text.SimpleDateFormat
 import java.util.Locale
+import java.util.concurrent.TimeUnit
 
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.functions._
@@ -515,6 +516,8 @@ class DateFunctionsSuite extends QueryTest with 
SharedSQLContext {
       Seq(Row(sdf3.format(new Timestamp(1000000))), Row(sdf3.format(new 
Timestamp(-1000000)))))
   }
 
+  private def secs(millis: Long): Long = 
TimeUnit.MILLISECONDS.toSeconds(millis)
+
   test("unix_timestamp") {
     val date1 = Date.valueOf("2015-07-24")
     val date2 = Date.valueOf("2015-07-25")
@@ -527,21 +530,21 @@ class DateFunctionsSuite extends QueryTest with 
SharedSQLContext {
     val fmt = "yyyy/MM/dd HH:mm:ss.S"
     val df = Seq((date1, ts1, s1, ss1), (date2, ts2, s2, ss2)).toDF("d", "ts", 
"s", "ss")
     checkAnswer(df.select(unix_timestamp(col("ts"))), Seq(
-      Row(ts1.getTime / 1000L), Row(ts2.getTime / 1000L)))
+      Row(secs(ts1.getTime)), Row(secs(ts2.getTime))))
     checkAnswer(df.select(unix_timestamp(col("ss"))), Seq(
-      Row(ts1.getTime / 1000L), Row(ts2.getTime / 1000L)))
+      Row(secs(ts1.getTime)), Row(secs(ts2.getTime))))
     checkAnswer(df.select(unix_timestamp(col("d"), fmt)), Seq(
-      Row(date1.getTime / 1000L), Row(date2.getTime / 1000L)))
+      Row(secs(date1.getTime)), Row(secs(date2.getTime))))
     checkAnswer(df.select(unix_timestamp(col("s"), fmt)), Seq(
-      Row(ts1.getTime / 1000L), Row(ts2.getTime / 1000L)))
+      Row(secs(ts1.getTime)), Row(secs(ts2.getTime))))
     checkAnswer(df.selectExpr("unix_timestamp(ts)"), Seq(
-      Row(ts1.getTime / 1000L), Row(ts2.getTime / 1000L)))
+      Row(secs(ts1.getTime)), Row(secs(ts2.getTime))))
     checkAnswer(df.selectExpr("unix_timestamp(ss)"), Seq(
-      Row(ts1.getTime / 1000L), Row(ts2.getTime / 1000L)))
+      Row(secs(ts1.getTime)), Row(secs(ts2.getTime))))
     checkAnswer(df.selectExpr(s"unix_timestamp(d, '$fmt')"), Seq(
-      Row(date1.getTime / 1000L), Row(date2.getTime / 1000L)))
+      Row(secs(date1.getTime)), Row(secs(date2.getTime))))
     checkAnswer(df.selectExpr(s"unix_timestamp(s, '$fmt')"), Seq(
-      Row(ts1.getTime / 1000L), Row(ts2.getTime / 1000L)))
+      Row(secs(ts1.getTime)), Row(secs(ts2.getTime))))
 
     val x1 = "2015-07-24 10:00:00"
     val x2 = "2015-25-07 02:02:02"
@@ -552,13 +555,13 @@ class DateFunctionsSuite extends QueryTest with 
SharedSQLContext {
 
     val df1 = Seq(x1, x2, x3, x4).toDF("x")
     checkAnswer(df1.select(unix_timestamp(col("x"))), Seq(
-      Row(ts1.getTime / 1000L), Row(null), Row(null), Row(null)))
+      Row(secs(ts1.getTime)), Row(null), Row(null), Row(null)))
     checkAnswer(df1.selectExpr("unix_timestamp(x)"), Seq(
-      Row(ts1.getTime / 1000L), Row(null), Row(null), Row(null)))
+      Row(secs(ts1.getTime)), Row(null), Row(null), Row(null)))
     checkAnswer(df1.select(unix_timestamp(col("x"), "yyyy-dd-MM HH:mm:ss")), 
Seq(
-      Row(null), Row(ts2.getTime / 1000L), Row(null), Row(null)))
+      Row(null), Row(secs(ts2.getTime)), Row(null), Row(null)))
     checkAnswer(df1.selectExpr(s"unix_timestamp(x, 'yyyy-MM-dd mm:HH:ss')"), 
Seq(
-      Row(ts4.getTime / 1000L), Row(null), Row(ts3.getTime / 1000L), 
Row(null)))
+      Row(secs(ts4.getTime)), Row(null), Row(secs(ts3.getTime)), Row(null)))
 
     // invalid format
     checkAnswer(df1.selectExpr(s"unix_timestamp(x, 'yyyy-MM-dd aa:HH:ss')"), 
Seq(
@@ -570,10 +573,12 @@ class DateFunctionsSuite extends QueryTest with 
SharedSQLContext {
     val ts5 = Timestamp.valueOf("2016-02-29 00:00:00")
     val df2 = Seq(y1, y2).toDF("y")
     checkAnswer(df2.select(unix_timestamp(col("y"), "yyyy-MM-dd")), Seq(
-      Row(ts5.getTime / 1000L), Row(null)))
+      Row(secs(ts5.getTime)), Row(null)))
 
     val now = sql("select unix_timestamp()").collect().head.getLong(0)
-    checkAnswer(sql(s"select cast ($now as timestamp)"), Row(new 
java.util.Date(now * 1000)))
+    checkAnswer(
+      sql(s"select cast ($now as timestamp)"),
+      Row(new java.util.Date(TimeUnit.SECONDS.toMillis(now))))
   }
 
   test("to_unix_timestamp") {
@@ -588,13 +593,13 @@ class DateFunctionsSuite extends QueryTest with 
SharedSQLContext {
     val fmt = "yyyy/MM/dd HH:mm:ss.S"
     val df = Seq((date1, ts1, s1, ss1), (date2, ts2, s2, ss2)).toDF("d", "ts", 
"s", "ss")
     checkAnswer(df.selectExpr("to_unix_timestamp(ts)"), Seq(
-      Row(ts1.getTime / 1000L), Row(ts2.getTime / 1000L)))
+      Row(secs(ts1.getTime)), Row(secs(ts2.getTime))))
     checkAnswer(df.selectExpr("to_unix_timestamp(ss)"), Seq(
-      Row(ts1.getTime / 1000L), Row(ts2.getTime / 1000L)))
+      Row(secs(ts1.getTime)), Row(secs(ts2.getTime))))
     checkAnswer(df.selectExpr(s"to_unix_timestamp(d, '$fmt')"), Seq(
-      Row(date1.getTime / 1000L), Row(date2.getTime / 1000L)))
+      Row(secs(date1.getTime)), Row(secs(date2.getTime))))
     checkAnswer(df.selectExpr(s"to_unix_timestamp(s, '$fmt')"), Seq(
-      Row(ts1.getTime / 1000L), Row(ts2.getTime / 1000L)))
+      Row(secs(ts1.getTime)), Row(secs(ts2.getTime))))
 
     val x1 = "2015-07-24 10:00:00"
     val x2 = "2015-25-07 02:02:02"
@@ -605,9 +610,9 @@ class DateFunctionsSuite extends QueryTest with 
SharedSQLContext {
 
     val df1 = Seq(x1, x2, x3, x4).toDF("x")
     checkAnswer(df1.selectExpr("to_unix_timestamp(x)"), Seq(
-      Row(ts1.getTime / 1000L), Row(null), Row(null), Row(null)))
+      Row(secs(ts1.getTime)), Row(null), Row(null), Row(null)))
     checkAnswer(df1.selectExpr(s"to_unix_timestamp(x, 'yyyy-MM-dd 
mm:HH:ss')"), Seq(
-      Row(ts4.getTime / 1000L), Row(null), Row(ts3.getTime / 1000L), 
Row(null)))
+      Row(secs(ts4.getTime)), Row(null), Row(secs(ts3.getTime)), Row(null)))
 
     // february
     val y1 = "2016-02-29"
@@ -615,7 +620,7 @@ class DateFunctionsSuite extends QueryTest with 
SharedSQLContext {
     val ts5 = Timestamp.valueOf("2016-02-29 00:00:00")
     val df2 = Seq(y1, y2).toDF("y")
     checkAnswer(df2.select(unix_timestamp(col("y"), "yyyy-MM-dd")), Seq(
-      Row(ts5.getTime / 1000L), Row(null)))
+      Row(secs(ts5.getTime)), Row(null)))
 
     // invalid format
     checkAnswer(df1.selectExpr(s"to_unix_timestamp(x, 'yyyy-MM-dd 
bb:HH:ss')"), Seq(
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
index 33c65d7..e1769fb 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer
 import java.nio.channels.ServerSocketChannel
 import java.sql.Timestamp
 import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.TimeUnit._
 
 import scala.collection.JavaConverters._
 
@@ -29,7 +30,6 @@ import org.scalatest.BeforeAndAfterEach
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.datasources.DataSource
 import 
org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
 import org.apache.spark.sql.execution.streaming._
@@ -168,7 +168,7 @@ class TextSocketStreamSuite extends StreamTest with 
SharedSQLContext with Before
       // Timestamp for rate stream is round to second which leads to 
milliseconds lost, that will
       // make batch1stamp smaller than current timestamp if both of them are 
in the same second.
       // Comparing by second to make sure the correct behavior.
-      assert(batch1Stamp.getTime >= curr / 1000 * 1000)
+      assert(batch1Stamp.getTime >= 
SECONDS.toMillis(MILLISECONDS.toSeconds(curr)))
       assert(!batch2Stamp.before(batch1Stamp))
     }
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
index b79770a..1ff9dec 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
@@ -21,6 +21,7 @@ import java.{util => ju}
 import java.io.File
 import java.text.SimpleDateFormat
 import java.util.{Calendar, Date, Locale}
+import java.util.concurrent.TimeUnit._
 
 import org.apache.commons.io.FileUtils
 import org.scalatest.{BeforeAndAfter, Matchers}
@@ -28,7 +29,6 @@ import org.scalatest.{BeforeAndAfter, Matchers}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{AnalysisException, Dataset}
 import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.functions.{count, window}
 import org.apache.spark.sql.internal.SQLConf
@@ -347,12 +347,13 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
     }
 
     testStream(aggWithWatermark)(
-      AddData(input, currentTimeMs / 1000),
+      AddData(input, MILLISECONDS.toSeconds(currentTimeMs)),
       CheckAnswer(),
-      AddData(input, currentTimeMs / 1000),
+      AddData(input, MILLISECONDS.toSeconds(currentTimeMs)),
       CheckAnswer(),
       assertEventStats { e =>
-        assert(timestampFormat.parse(e.get("max")).getTime === (currentTimeMs 
/ 1000) * 1000)
+        assert(timestampFormat.parse(e.get("max")).getTime ===
+          SECONDS.toMillis(MILLISECONDS.toSeconds((currentTimeMs))))
         val watermarkTime = timestampFormat.parse(e.get("watermark"))
         val monthDiff = monthsSinceEpoch(currentTime) - 
monthsSinceEpoch(watermarkTime)
         // monthsSinceEpoch is like `math.floor(num)`, so monthDiff has two 
possible values.
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
index 4dec2f7..178fced 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.hive
 
 import java.lang.reflect.{ParameterizedType, Type, WildcardType}
+import java.util.concurrent.TimeUnit._
 
 import scala.collection.JavaConverters._
 
@@ -460,7 +461,7 @@ private[hive] trait HiveInspectors {
         _ => constant
       case poi: WritableConstantTimestampObjectInspector =>
         val t = poi.getWritableConstantValue
-        val constant = t.getSeconds * 1000000L + t.getNanos / 1000L
+        val constant = SECONDS.toMicros(t.getSeconds) + 
NANOSECONDS.toMicros(t.getNanos)
         _ => constant
       case poi: WritableConstantIntObjectInspector =>
         val constant = poi.getWritableConstantValue.get()
@@ -629,7 +630,7 @@ private[hive] trait HiveInspectors {
           data: Any => {
             if (data != null) {
               val t = x.getPrimitiveWritableObject(data)
-              t.getSeconds * 1000000L + t.getNanos / 1000L
+              SECONDS.toMicros(t.getSeconds) + NANOSECONDS.toMicros(t.getNanos)
             } else {
               null
             }
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index bfe19c2..77ac606 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.client
 import java.io.{File, PrintStream}
 import java.lang.{Iterable => JIterable}
 import java.util.{Locale, Map => JMap}
+import java.util.concurrent.TimeUnit._
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -948,8 +949,8 @@ private[hive] object HiveClientImpl {
     hiveTable.setFields(schema.asJava)
     hiveTable.setPartCols(partCols.asJava)
     userName.foreach(hiveTable.setOwner)
-    hiveTable.setCreateTime((table.createTime / 1000).toInt)
-    hiveTable.setLastAccessTime((table.lastAccessTime / 1000).toInt)
+    hiveTable.setCreateTime(MILLISECONDS.toSeconds(table.createTime).toInt)
+    
hiveTable.setLastAccessTime(MILLISECONDS.toSeconds(table.lastAccessTime).toInt)
     table.storage.locationUri.map(CatalogUtils.URIToString).foreach { loc =>
       hiveTable.getTTable.getSd.setLocation(loc)}
     
table.storage.inputFormat.map(toInputFormat).foreach(hiveTable.setInputFormatClass)
@@ -1012,8 +1013,8 @@ private[hive] object HiveClientImpl {
     tpart.setTableName(ht.getTableName)
     tpart.setValues(partValues.asJava)
     tpart.setSd(storageDesc)
-    tpart.setCreateTime((p.createTime / 1000).toInt)
-    tpart.setLastAccessTime((p.lastAccessTime / 1000).toInt)
+    tpart.setCreateTime(MILLISECONDS.toSeconds(p.createTime).toInt)
+    tpart.setLastAccessTime(MILLISECONDS.toSeconds(p.lastAccessTime).toInt)
     tpart.setParameters(mutable.Map(p.parameters.toSeq: _*).asJava)
     new HivePartition(ht, tpart)
   }
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index a8ebb23..af5ea59 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -516,7 +516,7 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
       f.className,
       null,
       PrincipalType.USER,
-      (System.currentTimeMillis / 1000).toInt,
+      TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis).toInt,
       FunctionType.JAVA,
       resourceUris.asJava)
   }
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala
index 29cc1fa..342f20f 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala
@@ -79,7 +79,7 @@ class RateLimitedOutputStream(out: OutputStream, 
desiredBytesPerSec: Int)
     } else {
       // Calculate how much time we should sleep to bring ourselves to the 
desired rate.
       val targetTimeInMillis = bytesWrittenSinceSync * 1000 / 
desiredBytesPerSec
-      val elapsedTimeInMillis = elapsedNanosecs / 1000000
+      val elapsedTimeInMillis = NANOSECONDS.toMillis(elapsedNanosecs)
       val sleepTimeInMillis = targetTimeInMillis - elapsedTimeInMillis
       if (sleepTimeInMillis > 0) {
         logTrace("Natural rate is " + rate + " per second but desired rate is 
" +


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to