Repository: spark
Updated Branches:
  refs/heads/master b928f5438 -> 37719e0cd


[SPARK-8189] [SQL] use Long for TimestampType in SQL

This PR change to use Long as internal type for TimestampType for efficiency, 
which means it will the precision below 100ns.

Author: Davies Liu <dav...@databricks.com>

Closes #6733 from davies/timestamp and squashes the following commits:

d9565fa [Davies Liu] remove print
65cf2f1 [Davies Liu] fix Timestamp in SparkR
86fecfb [Davies Liu] disable two timestamp tests
8f77ee0 [Davies Liu] fix scala style
246ee74 [Davies Liu] address comments
309d2e1 [Davies Liu] use Long for TimestampType in SQL


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/37719e0c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/37719e0c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/37719e0c

Branch: refs/heads/master
Commit: 37719e0cd0b00cc5ffee0ebe1652d465a574db0f
Parents: b928f54
Author: Davies Liu <dav...@databricks.com>
Authored: Wed Jun 10 16:55:39 2015 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Wed Jun 10 16:55:39 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/api/r/SerDe.scala    | 17 ++++--
 python/pyspark/sql/types.py                     | 11 ++++
 .../scala/org/apache/spark/sql/BaseRow.java     |  6 ++
 .../main/scala/org/apache/spark/sql/Row.scala   |  8 ++-
 .../sql/catalyst/CatalystTypeConverters.scala   | 13 +++-
 .../spark/sql/catalyst/expressions/Cast.scala   | 62 +++++++++-----------
 .../expressions/SpecificMutableRow.scala        |  1 +
 .../expressions/codegen/CodeGenerator.scala     |  4 +-
 .../codegen/GenerateProjection.scala            | 10 +++-
 .../sql/catalyst/expressions/literals.scala     | 15 +++--
 .../sql/catalyst/expressions/predicates.scala   |  6 +-
 .../spark/sql/catalyst/util/DateUtils.scala     | 44 +++++++++++---
 .../apache/spark/sql/types/TimestampType.scala  | 10 +---
 .../sql/catalyst/expressions/CastSuite.scala    | 11 ++--
 .../sql/catalyst/util/DateUtilsSuite.scala      | 40 +++++++++++++
 .../apache/spark/sql/types/DataTypeSuite.scala  |  2 +-
 .../apache/spark/sql/columnar/ColumnStats.scala | 21 +------
 .../apache/spark/sql/columnar/ColumnType.scala  | 19 +++---
 .../sql/execution/SparkSqlSerializer2.scala     | 17 ++----
 .../spark/sql/execution/debug/package.scala     |  2 +
 .../apache/spark/sql/execution/pythonUdfs.scala |  7 ++-
 .../org/apache/spark/sql/jdbc/JDBCRDD.scala     | 10 +++-
 .../apache/spark/sql/json/JacksonParser.scala   |  5 +-
 .../org/apache/spark/sql/json/JsonRDD.scala     | 10 ++--
 .../spark/sql/parquet/ParquetConverter.scala    |  9 +--
 .../spark/sql/parquet/ParquetTableSupport.scala | 10 ++--
 .../org/apache/spark/sql/CachedTableSuite.scala |  2 +-
 .../spark/sql/columnar/ColumnStatsSuite.scala   |  2 +-
 .../spark/sql/columnar/ColumnTypeSuite.scala    | 11 ++--
 .../spark/sql/columnar/ColumnarTestUtils.scala  |  9 +--
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala   |  2 +-
 .../org/apache/spark/sql/json/JsonSuite.scala   | 14 +++--
 .../hive/execution/HiveCompatibilitySuite.scala |  8 ++-
 .../apache/spark/sql/hive/HiveInspectors.scala  | 20 ++++---
 .../org/apache/spark/sql/hive/TableReader.scala |  4 +-
 ...p cast #5-0-dbd7bcd167d322d6617b884c02c7f247 |  2 +-
 36 files changed, 272 insertions(+), 172 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/37719e0c/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala 
b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
index f8e3f1a..56adc85 100644
--- a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.api.r
 
 import java.io.{DataInputStream, DataOutputStream}
-import java.sql.{Date, Time}
+import java.sql.{Timestamp, Date, Time}
 
 import scala.collection.JavaConversions._
 
@@ -107,9 +107,12 @@ private[spark] object SerDe {
     Date.valueOf(readString(in))
   }
 
-  def readTime(in: DataInputStream): Time = {
-    val t = in.readDouble()
-    new Time((t * 1000L).toLong)
+  def readTime(in: DataInputStream): Timestamp = {
+    val seconds = in.readDouble()
+    val sec = Math.floor(seconds).toLong
+    val t = new Timestamp(sec * 1000L)
+    t.setNanos(((seconds - sec) * 1e9).toInt)
+    t
   }
 
   def readBytesArr(in: DataInputStream): Array[Array[Byte]] = {
@@ -227,6 +230,9 @@ private[spark] object SerDe {
         case "java.sql.Time" =>
           writeType(dos, "time")
           writeTime(dos, value.asInstanceOf[Time])
+        case "java.sql.Timestamp" =>
+          writeType(dos, "time")
+          writeTime(dos, value.asInstanceOf[Timestamp])
         case "[B" =>
           writeType(dos, "raw")
           writeBytes(dos, value.asInstanceOf[Array[Byte]])
@@ -289,6 +295,9 @@ private[spark] object SerDe {
     out.writeDouble(value.getTime.toDouble / 1000.0)
   }
 
+  def writeTime(out: DataOutputStream, value: Timestamp): Unit = {
+    out.writeDouble((value.getTime / 1000).toDouble + value.getNanos.toDouble 
/ 1e9)
+  }
 
   // NOTE: Only works for ASCII right now
   def writeString(out: DataOutputStream, value: String): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/37719e0c/python/pyspark/sql/types.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py
index b6ec613..8f286b6 100644
--- a/python/pyspark/sql/types.py
+++ b/python/pyspark/sql/types.py
@@ -19,6 +19,7 @@ import sys
 import decimal
 import time
 import datetime
+import calendar
 import keyword
 import warnings
 import json
@@ -654,6 +655,8 @@ def _need_python_to_sql_conversion(dataType):
             _need_python_to_sql_conversion(dataType.valueType)
     elif isinstance(dataType, UserDefinedType):
         return True
+    elif isinstance(dataType, TimestampType):
+        return True
     else:
         return False
 
@@ -707,6 +710,14 @@ def _python_to_sql_converter(dataType):
         return lambda m: dict([(key_converter(k), value_converter(v)) for k, v 
in m.items()])
     elif isinstance(dataType, UserDefinedType):
         return lambda obj: dataType.serialize(obj)
+    elif isinstance(dataType, TimestampType):
+
+        def to_posix_timstamp(dt):
+            if dt.tzinfo is None:
+                return int(time.mktime(dt.timetuple()) * 1e7 + dt.microsecond 
* 10)
+            else:
+                return int(calendar.timegm(dt.utctimetuple()) * 1e7 + 
dt.microsecond * 10)
+        return to_posix_timstamp
     else:
         raise ValueError("Unexpected type %r" % dataType)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/37719e0c/sql/catalyst/src/main/scala/org/apache/spark/sql/BaseRow.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/BaseRow.java 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/BaseRow.java
index d138b43..6584882 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/BaseRow.java
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/BaseRow.java
@@ -19,6 +19,7 @@ package org.apache.spark.sql;
 
 import java.math.BigDecimal;
 import java.sql.Date;
+import java.sql.Timestamp;
 import java.util.List;
 
 import scala.collection.Seq;
@@ -104,6 +105,11 @@ public abstract class BaseRow implements Row {
   }
 
   @Override
+  public Timestamp getTimestamp(int i) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
   public <T> Seq<T> getSeq(int i) {
     throw new UnsupportedOperationException();
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/37719e0c/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
index 0d460b6..8aaf5d7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
@@ -260,10 +260,16 @@ trait Row extends Serializable {
    *
    * @throws ClassCastException when data type does not match.
    */
-  // TODO(davies): This is not the right default implementation, we use Int as 
Date internally
   def getDate(i: Int): java.sql.Date = apply(i).asInstanceOf[java.sql.Date]
 
   /**
+   * Returns the value at position i of date type as java.sql.Timestamp.
+   *
+   * @throws ClassCastException when data type does not match.
+   */
+  def getTimestamp(i: Int): java.sql.Timestamp = 
apply(i).asInstanceOf[java.sql.Timestamp]
+
+  /**
    * Returns the value at position i of array type as a Scala Seq.
    *
    * @throws ClassCastException when data type does not match.

http://git-wip-us.apache.org/repos/asf/spark/blob/37719e0c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
index 2e7b4c2..beb82db 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst
 
 import java.lang.{Iterable => JavaIterable}
 import java.math.{BigDecimal => JavaBigDecimal}
-import java.sql.Date
+import java.sql.{Timestamp, Date}
 import java.util.{Map => JavaMap}
 import javax.annotation.Nullable
 
@@ -58,6 +58,7 @@ object CatalystTypeConverters {
       case structType: StructType => StructConverter(structType)
       case StringType => StringConverter
       case DateType => DateConverter
+      case TimestampType => TimestampConverter
       case dt: DecimalType => BigDecimalConverter
       case BooleanType => BooleanConverter
       case ByteType => ByteConverter
@@ -274,6 +275,15 @@ object CatalystTypeConverters {
     override def toScalaImpl(row: Row, column: Int): Date = 
toScala(row.getInt(column))
   }
 
+  private object TimestampConverter extends CatalystTypeConverter[Timestamp, 
Timestamp, Any] {
+    override def toCatalystImpl(scalaValue: Timestamp): Long =
+      DateUtils.fromJavaTimestamp(scalaValue)
+    override def toScala(catalystValue: Any): Timestamp =
+      if (catalystValue == null) null
+      else DateUtils.toJavaTimestamp(catalystValue.asInstanceOf[Long])
+    override def toScalaImpl(row: Row, column: Int): Timestamp = 
toScala(row.getLong(column))
+  }
+
   private object BigDecimalConverter extends CatalystTypeConverter[Any, 
JavaBigDecimal, Decimal] {
     override def toCatalystImpl(scalaValue: Any): Decimal = scalaValue match {
       case d: BigDecimal => Decimal(d)
@@ -367,6 +377,7 @@ object CatalystTypeConverters {
   def convertToCatalyst(a: Any): Any = a match {
     case s: String => StringConverter.toCatalyst(s)
     case d: Date => DateConverter.toCatalyst(d)
+    case t: Timestamp => TimestampConverter.toCatalyst(t)
     case d: BigDecimal => BigDecimalConverter.toCatalyst(d)
     case d: JavaBigDecimal => BigDecimalConverter.toCatalyst(d)
     case seq: Seq[Any] => seq.map(convertToCatalyst)

http://git-wip-us.apache.org/repos/asf/spark/blob/37719e0c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
index 18102d1..8d93957 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
@@ -113,7 +113,8 @@ case class Cast(child: Expression, dataType: DataType) 
extends UnaryExpression w
   private[this] def castToString(from: DataType): Any => Any = from match {
     case BinaryType => buildCast[Array[Byte]](_, UTF8String(_))
     case DateType => buildCast[Int](_, d => UTF8String(DateUtils.toString(d)))
-    case TimestampType => buildCast[Timestamp](_, t => 
UTF8String(timestampToString(t)))
+    case TimestampType => buildCast[Long](_,
+      t => UTF8String(timestampToString(DateUtils.toJavaTimestamp(t))))
     case _ => buildCast[Any](_, o => UTF8String(o.toString))
   }
 
@@ -127,7 +128,7 @@ case class Cast(child: Expression, dataType: DataType) 
extends UnaryExpression w
     case StringType =>
       buildCast[UTF8String](_, _.length() != 0)
     case TimestampType =>
-      buildCast[Timestamp](_, t => t.getTime() != 0 || t.getNanos() != 0)
+      buildCast[Long](_, t => t != 0)
     case DateType =>
       // Hive would return null when cast from date to boolean
       buildCast[Int](_, d => null)
@@ -158,20 +159,21 @@ case class Cast(child: Expression, dataType: DataType) 
extends UnaryExpression w
         if (periodIdx != -1 && n.length() - periodIdx > 9) {
           n = n.substring(0, periodIdx + 10)
         }
-        try Timestamp.valueOf(n) catch { case _: 
java.lang.IllegalArgumentException => null }
+        try DateUtils.fromJavaTimestamp(Timestamp.valueOf(n))
+        catch { case _: java.lang.IllegalArgumentException => null }
       })
     case BooleanType =>
-      buildCast[Boolean](_, b => new Timestamp(if (b) 1 else 0))
+      buildCast[Boolean](_, b => (if (b) 1L else 0))
     case LongType =>
-      buildCast[Long](_, l => new Timestamp(l))
+      buildCast[Long](_, l => longToTimestamp(l))
     case IntegerType =>
-      buildCast[Int](_, i => new Timestamp(i))
+      buildCast[Int](_, i => longToTimestamp(i.toLong))
     case ShortType =>
-      buildCast[Short](_, s => new Timestamp(s))
+      buildCast[Short](_, s => longToTimestamp(s.toLong))
     case ByteType =>
-      buildCast[Byte](_, b => new Timestamp(b))
+      buildCast[Byte](_, b => longToTimestamp(b.toLong))
     case DateType =>
-      buildCast[Int](_, d => new Timestamp(DateUtils.toJavaDate(d).getTime))
+      buildCast[Int](_, d => DateUtils.toMillisSinceEpoch(d) * 10000)
     // TimestampWritable.decimalToTimestamp
     case DecimalType() =>
       buildCast[Decimal](_, d => decimalToTimestamp(d))
@@ -191,25 +193,17 @@ case class Cast(child: Expression, dataType: DataType) 
extends UnaryExpression w
       })
   }
 
-  private[this] def decimalToTimestamp(d: Decimal) = {
-    val seconds = Math.floor(d.toDouble).toLong
-    val bd = (d.toBigDecimal - seconds) * 1000000000
-    val nanos = bd.intValue()
-
-    val millis = seconds * 1000
-    val t = new Timestamp(millis)
-
-    // remaining fractional portion as nanos
-    t.setNanos(nanos)
-    t
+  private[this] def decimalToTimestamp(d: Decimal): Long = {
+    (d.toBigDecimal * 10000000L).longValue()
   }
 
-  // Timestamp to long, converting milliseconds to seconds
-  private[this] def timestampToLong(ts: Timestamp) = Math.floor(ts.getTime / 
1000.0).toLong
-
-  private[this] def timestampToDouble(ts: Timestamp) = {
-    // First part is the seconds since the beginning of time, followed by 
nanosecs.
-    Math.floor(ts.getTime / 1000.0).toLong + ts.getNanos.toDouble / 1000000000
+  // converting milliseconds to 100ns
+  private[this] def longToTimestamp(t: Long): Long = t * 10000L
+  // converting 100ns to seconds
+  private[this] def timestampToLong(ts: Long): Long = math.floor(ts.toDouble / 
10000000L).toLong
+  // converting 100ns to seconds in double
+  private[this] def timestampToDouble(ts: Long): Double = {
+    ts / 10000000.0
   }
 
   // Converts Timestamp to string according to Hive TimestampWritable 
convention
@@ -234,7 +228,7 @@ case class Cast(child: Expression, dataType: DataType) 
extends UnaryExpression w
     case TimestampType =>
       // throw valid precision more than seconds, according to Hive.
       // Timestamp.nanos is in 0 to 999,999,999, no more than a second.
-      buildCast[Timestamp](_, t => DateUtils.millisToDays(t.getTime))
+      buildCast[Long](_, t => DateUtils.millisToDays(t / 10000L))
     // Hive throws this exception as a Semantic Exception
     // It is never possible to compare result when hive return with exception,
     // so we can return null
@@ -253,7 +247,7 @@ case class Cast(child: Expression, dataType: DataType) 
extends UnaryExpression w
     case DateType =>
       buildCast[Int](_, d => null)
     case TimestampType =>
-      buildCast[Timestamp](_, t => timestampToLong(t))
+      buildCast[Long](_, t => timestampToLong(t))
     case x: NumericType =>
       b => x.numeric.asInstanceOf[Numeric[Any]].toLong(b)
   }
@@ -269,7 +263,7 @@ case class Cast(child: Expression, dataType: DataType) 
extends UnaryExpression w
     case DateType =>
       buildCast[Int](_, d => null)
     case TimestampType =>
-      buildCast[Timestamp](_, t => timestampToLong(t).toInt)
+      buildCast[Long](_, t => timestampToLong(t).toInt)
     case x: NumericType =>
       b => x.numeric.asInstanceOf[Numeric[Any]].toInt(b)
   }
@@ -285,7 +279,7 @@ case class Cast(child: Expression, dataType: DataType) 
extends UnaryExpression w
     case DateType =>
       buildCast[Int](_, d => null)
     case TimestampType =>
-      buildCast[Timestamp](_, t => timestampToLong(t).toShort)
+      buildCast[Long](_, t => timestampToLong(t).toShort)
     case x: NumericType =>
       b => x.numeric.asInstanceOf[Numeric[Any]].toInt(b).toShort
   }
@@ -301,7 +295,7 @@ case class Cast(child: Expression, dataType: DataType) 
extends UnaryExpression w
     case DateType =>
       buildCast[Int](_, d => null)
     case TimestampType =>
-      buildCast[Timestamp](_, t => timestampToLong(t).toByte)
+      buildCast[Long](_, t => timestampToLong(t).toByte)
     case x: NumericType =>
       b => x.numeric.asInstanceOf[Numeric[Any]].toInt(b).toByte
   }
@@ -334,7 +328,7 @@ case class Cast(child: Expression, dataType: DataType) 
extends UnaryExpression w
       buildCast[Int](_, d => null) // date can't cast to decimal in Hive
     case TimestampType =>
       // Note that we lose precision here.
-      buildCast[Timestamp](_, t => 
changePrecision(Decimal(timestampToDouble(t)), target))
+      buildCast[Long](_, t => changePrecision(Decimal(timestampToDouble(t)), 
target))
     case DecimalType() =>
       b => changePrecision(b.asInstanceOf[Decimal].clone(), target)
     case LongType =>
@@ -358,7 +352,7 @@ case class Cast(child: Expression, dataType: DataType) 
extends UnaryExpression w
     case DateType =>
       buildCast[Int](_, d => null)
     case TimestampType =>
-      buildCast[Timestamp](_, t => timestampToDouble(t))
+      buildCast[Long](_, t => timestampToDouble(t))
     case x: NumericType =>
       b => x.numeric.asInstanceOf[Numeric[Any]].toDouble(b)
   }
@@ -374,7 +368,7 @@ case class Cast(child: Expression, dataType: DataType) 
extends UnaryExpression w
     case DateType =>
       buildCast[Int](_, d => null)
     case TimestampType =>
-      buildCast[Timestamp](_, t => timestampToDouble(t).toFloat)
+      buildCast[Long](_, t => timestampToDouble(t).toFloat)
     case x: NumericType =>
       b => x.numeric.asInstanceOf[Numeric[Any]].toFloat(b)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/37719e0c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala
index aa4099e..2c88451 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala
@@ -203,6 +203,7 @@ final class SpecificMutableRow(val values: 
Array[MutableValue]) extends MutableR
         case BooleanType => new MutableBoolean
         case LongType => new MutableLong
         case DateType => new MutableInt // We use INT for DATE internally
+        case TimestampType => new MutableLong  // We use Long for Timestamp 
internally
         case _ => new MutableAny
       }.toArray)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/37719e0c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index e95682f..80aa8fa 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -122,7 +122,7 @@ class CodeGenContext {
     case BinaryType => "byte[]"
     case StringType => stringType
     case DateType => "int"
-    case TimestampType => "java.sql.Timestamp"
+    case TimestampType => "long"
     case dt: OpenHashSetUDT if dt.elementType == IntegerType => 
classOf[IntegerHashSet].getName
     case dt: OpenHashSetUDT if dt.elementType == LongType => 
classOf[LongHashSet].getName
     case _ => "Object"
@@ -140,6 +140,7 @@ class CodeGenContext {
     case FloatType => "Float"
     case BooleanType => "Boolean"
     case DateType => "Integer"
+    case TimestampType => "Long"
     case _ => javaType(dt)
   }
 
@@ -155,6 +156,7 @@ class CodeGenContext {
     case DoubleType => "-1.0"
     case IntegerType => "-1"
     case DateType => "-1"
+    case TimestampType => "-1L"
     case _ => "null"
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/37719e0c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala
index 7caf4aa..274429c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala
@@ -73,7 +73,9 @@ object GenerateProjection extends 
CodeGenerator[Seq[Expression], Projection] {
 
     val specificAccessorFunctions = ctx.nativeTypes.map { dataType =>
       val cases = expressions.zipWithIndex.map {
-        case (e, i) if e.dataType == dataType =>
+        case (e, i) if e.dataType == dataType
+          || dataType == IntegerType && e.dataType == DateType
+          || dataType == LongType && e.dataType == TimestampType =>
           s"case $i: return c$i;"
         case _ => ""
       }.mkString("\n        ")
@@ -96,7 +98,9 @@ object GenerateProjection extends 
CodeGenerator[Seq[Expression], Projection] {
 
     val specificMutatorFunctions = ctx.nativeTypes.map { dataType =>
       val cases = expressions.zipWithIndex.map {
-        case (e, i) if e.dataType == dataType =>
+        case (e, i) if e.dataType == dataType
+          || dataType == IntegerType && e.dataType == DateType
+          || dataType == LongType && e.dataType == TimestampType =>
           s"case $i: { c$i = value; return; }"
         case _ => ""
       }.mkString("\n")
@@ -119,7 +123,7 @@ object GenerateProjection extends 
CodeGenerator[Seq[Expression], Projection] {
       val nonNull = e.dataType match {
         case BooleanType => s"$col ? 0 : 1"
         case ByteType | ShortType | IntegerType | DateType => s"$col"
-        case LongType => s"$col ^ ($col >>> 32)"
+        case LongType | TimestampType => s"$col ^ ($col >>> 32)"
         case FloatType => s"Float.floatToIntBits($col)"
         case DoubleType =>
             s"(int)(Double.doubleToLongBits($col) ^ 
(Double.doubleToLongBits($col) >>> 32))"

http://git-wip-us.apache.org/repos/asf/spark/blob/37719e0c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
index 297b35b..833c08a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
@@ -37,7 +37,7 @@ object Literal {
     case d: BigDecimal => Literal(Decimal(d), DecimalType.Unlimited)
     case d: java.math.BigDecimal => Literal(Decimal(d), DecimalType.Unlimited)
     case d: Decimal => Literal(d, DecimalType.Unlimited)
-    case t: Timestamp => Literal(t, TimestampType)
+    case t: Timestamp => Literal(DateUtils.fromJavaTimestamp(t), TimestampType)
     case d: Date => Literal(DateUtils.fromJavaDate(d), DateType)
     case a: Array[Byte] => Literal(a, BinaryType)
     case null => Literal(null, NullType)
@@ -100,7 +100,7 @@ case class Literal protected (value: Any, dataType: 
DataType) extends LeafExpres
           ev.isNull = "false"
           ev.primitive = value.toString
           ""
-        case FloatType =>  // This must go before NumericType
+        case FloatType =>
           val v = value.asInstanceOf[Float]
           if (v.isNaN || v.isInfinite) {
             super.genCode(ctx, ev)
@@ -109,7 +109,7 @@ case class Literal protected (value: Any, dataType: 
DataType) extends LeafExpres
             ev.primitive = s"${value}f"
             ""
           }
-        case DoubleType =>  // This must go before NumericType
+        case DoubleType =>
           val v = value.asInstanceOf[Double]
           if (v.isNaN || v.isInfinite) {
             super.genCode(ctx, ev)
@@ -118,15 +118,18 @@ case class Literal protected (value: Any, dataType: 
DataType) extends LeafExpres
             ev.primitive = s"${value}"
             ""
           }
-
-        case ByteType | ShortType =>  // This must go before NumericType
+        case ByteType | ShortType =>
           ev.isNull = "false"
           ev.primitive = s"(${ctx.javaType(dataType)})$value"
           ""
-        case dt: NumericType if !dt.isInstanceOf[DecimalType] =>
+        case IntegerType | DateType =>
           ev.isNull = "false"
           ev.primitive = value.toString
           ""
+        case TimestampType | LongType =>
+          ev.isNull = "false"
+          ev.primitive = s"${value}L"
+          ""
         // eval() version may be faster for non-primitive types
         case other =>
           super.genCode(ctx, ev)

http://git-wip-us.apache.org/repos/asf/spark/blob/37719e0c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
index 3cbdfdf..2c49352 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
@@ -254,9 +254,9 @@ abstract class BinaryComparison extends BinaryExpression 
with Predicate {
       case dt: NumericType if ctx.isNativeType(dt) => defineCodeGen (ctx, ev, {
         (c1, c3) => s"$c1 $symbol $c3"
       })
-      case TimestampType =>
-        // java.sql.Timestamp does not have compare()
-        super.genCode(ctx, ev)
+      case DateType | TimestampType => defineCodeGen (ctx, ev, {
+        (c1, c3) => s"$c1 $symbol $c3"
+      })
       case other => defineCodeGen (ctx, ev, {
         (c1, c2) => s"$c1.compare($c2) $symbol 0"
       })

http://git-wip-us.apache.org/repos/asf/spark/blob/37719e0c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateUtils.scala
index ad649ac..5cadc14 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateUtils.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.catalyst.util
 
-import java.sql.Date
+import java.sql.{Timestamp, Date}
 import java.text.SimpleDateFormat
 import java.util.{Calendar, TimeZone}
 
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.Cast
  */
 object DateUtils {
   private val MILLIS_PER_DAY = 86400000
+  private val HUNDRED_NANOS_PER_SECOND = 10000000L
 
   // Java TimeZone has no mention of thread safety. Use thread local instance 
to be safe.
   private val LOCAL_TIMEZONE = new ThreadLocal[TimeZone] {
@@ -45,17 +46,17 @@ object DateUtils {
     ((millisLocal + LOCAL_TIMEZONE.get().getOffset(millisLocal)) / 
MILLIS_PER_DAY).toInt
   }
 
-  private def toMillisSinceEpoch(days: Int): Long = {
+  def toMillisSinceEpoch(days: Int): Long = {
     val millisUtc = days.toLong * MILLIS_PER_DAY
     millisUtc - LOCAL_TIMEZONE.get().getOffset(millisUtc)
   }
 
-  def fromJavaDate(date: java.sql.Date): Int = {
+  def fromJavaDate(date: Date): Int = {
     javaDateToDays(date)
   }
 
-  def toJavaDate(daysSinceEpoch: Int): java.sql.Date = {
-    new java.sql.Date(toMillisSinceEpoch(daysSinceEpoch))
+  def toJavaDate(daysSinceEpoch: Int): Date = {
+    new Date(toMillisSinceEpoch(daysSinceEpoch))
   }
 
   def toString(days: Int): String = 
Cast.threadLocalDateFormat.get.format(toJavaDate(days))
@@ -64,9 +65,9 @@ object DateUtils {
     if (!s.contains('T')) {
       // JDBC escape string
       if (s.contains(' ')) {
-        java.sql.Timestamp.valueOf(s)
+        Timestamp.valueOf(s)
       } else {
-        java.sql.Date.valueOf(s)
+        Date.valueOf(s)
       }
     } else if (s.endsWith("Z")) {
       // this is zero timezone of ISO8601
@@ -87,4 +88,33 @@ object DateUtils {
       ISO8601GMT.parse(s)
     }
   }
+
+  /**
+   * Return a java.sql.Timestamp from number of 100ns since epoch
+   */
+  def toJavaTimestamp(num100ns: Long): Timestamp = {
+    // setNanos() will overwrite the millisecond part, so the milliseconds 
should be
+    // cut off at seconds
+    var seconds = num100ns / HUNDRED_NANOS_PER_SECOND
+    var nanos = num100ns % HUNDRED_NANOS_PER_SECOND
+    // setNanos() can not accept negative value
+    if (nanos < 0) {
+      nanos += HUNDRED_NANOS_PER_SECOND
+      seconds -= 1
+    }
+    val t = new Timestamp(seconds * 1000)
+    t.setNanos(nanos.toInt * 100)
+    t
+  }
+
+  /**
+   * Return the number of 100ns since epoch from java.sql.Timestamp.
+   */
+  def fromJavaTimestamp(t: Timestamp): Long = {
+    if (t != null) {
+      t.getTime() * 10000L + (t.getNanos().toLong / 100) % 10000L
+    } else {
+      0L
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/37719e0c/sql/catalyst/src/main/scala/org/apache/spark/sql/types/TimestampType.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/TimestampType.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/TimestampType.scala
index aebabfc..a558641 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/TimestampType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/TimestampType.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.types
 
-import java.sql.Timestamp
-
 import scala.math.Ordering
 import scala.reflect.runtime.universe.typeTag
 
@@ -38,18 +36,16 @@ class TimestampType private() extends AtomicType {
   // The companion object and this class is separated so the companion object 
also subclasses
   // this type. Otherwise, the companion object would be of type 
"TimestampType$" in byte code.
   // Defined with a private constructor so the companion object is the only 
possible instantiation.
-  private[sql] type InternalType = Timestamp
+  private[sql] type InternalType = Long
 
   @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { 
typeTag[InternalType] }
 
-  private[sql] val ordering = new Ordering[InternalType] {
-    def compare(x: Timestamp, y: Timestamp): Int = x.compareTo(y)
-  }
+  private[sql] val ordering = implicitly[Ordering[InternalType]]
 
   /**
    * The default size of a value of the TimestampType is 12 bytes.
    */
-  override def defaultSize: Int = 12
+  override def defaultSize: Int = 8
 
   private[spark] override def asNullable: TimestampType = this
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/37719e0c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala
index 5bc7c30..3aca94d 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.expressions
 import java.sql.{Timestamp, Date}
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.util.DateUtils
 import org.apache.spark.sql.types._
 
 /**
@@ -137,7 +138,7 @@ class CastSuite extends SparkFunSuite with 
ExpressionEvalHelper {
     checkEvaluation(cast(cast(sd, DateType), StringType), sd)
     checkEvaluation(cast(cast(d, StringType), DateType), 0)
     checkEvaluation(cast(cast(nts, TimestampType), StringType), nts)
-    checkEvaluation(cast(cast(ts, StringType), TimestampType), ts)
+    checkEvaluation(cast(cast(ts, StringType), TimestampType), 
DateUtils.fromJavaTimestamp(ts))
 
     // all convert to string type to check
     checkEvaluation(cast(cast(cast(nts, TimestampType), DateType), 
StringType), sd)
@@ -269,9 +270,9 @@ class CastSuite extends SparkFunSuite with 
ExpressionEvalHelper {
     checkEvaluation(cast(ts, LongType), 15.toLong)
     checkEvaluation(cast(ts, FloatType), 15.002f)
     checkEvaluation(cast(ts, DoubleType), 15.002)
-    checkEvaluation(cast(cast(tss, ShortType), TimestampType), ts)
-    checkEvaluation(cast(cast(tss, IntegerType), TimestampType), ts)
-    checkEvaluation(cast(cast(tss, LongType), TimestampType), ts)
+    checkEvaluation(cast(cast(tss, ShortType), TimestampType), 
DateUtils.fromJavaTimestamp(ts))
+    checkEvaluation(cast(cast(tss, IntegerType), TimestampType), 
DateUtils.fromJavaTimestamp(ts))
+    checkEvaluation(cast(cast(tss, LongType), TimestampType), 
DateUtils.fromJavaTimestamp(ts))
     checkEvaluation(
       cast(cast(millis.toFloat / 1000, TimestampType), FloatType),
       millis.toFloat / 1000)
@@ -283,7 +284,7 @@ class CastSuite extends SparkFunSuite with 
ExpressionEvalHelper {
       Decimal(1))
 
     // A test for higher precision than millis
-    checkEvaluation(cast(cast(0.00000001, TimestampType), DoubleType), 
0.00000001)
+    checkEvaluation(cast(cast(0.0000001, TimestampType), DoubleType), 
0.0000001)
 
     checkEvaluation(cast(Double.NaN, TimestampType), null)
     checkEvaluation(cast(1.0 / 0.0, TimestampType), null)

http://git-wip-us.apache.org/repos/asf/spark/blob/37719e0c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateUtilsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateUtilsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateUtilsSuite.scala
new file mode 100644
index 0000000..a424554
--- /dev/null
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateUtilsSuite.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import java.sql.Timestamp
+
+import org.apache.spark.SparkFunSuite
+
+
+class DateUtilsSuite extends SparkFunSuite {
+
+  test("timestamp") {
+    val now = new Timestamp(System.currentTimeMillis())
+    now.setNanos(100)
+    val ns = DateUtils.fromJavaTimestamp(now)
+    assert(ns % 10000000L == 1)
+    assert(DateUtils.toJavaTimestamp(ns) == now)
+
+    List(-111111111111L, -1L, 0, 1L, 111111111111L).foreach { t =>
+      val ts = DateUtils.toJavaTimestamp(t)
+      assert(DateUtils.fromJavaTimestamp(ts) == t)
+      assert(DateUtils.toJavaTimestamp(DateUtils.fromJavaTimestamp(ts)) == ts)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/37719e0c/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala
index 261c4fc..077c0ad 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala
@@ -190,7 +190,7 @@ class DataTypeSuite extends SparkFunSuite {
   checkDefaultSize(DecimalType(10, 5), 4096)
   checkDefaultSize(DecimalType.Unlimited, 4096)
   checkDefaultSize(DateType, 4)
-  checkDefaultSize(TimestampType, 12)
+  checkDefaultSize(TimestampType, 8)
   checkDefaultSize(StringType, 4096)
   checkDefaultSize(BinaryType, 4096)
   checkDefaultSize(ArrayType(DoubleType, true), 800)

http://git-wip-us.apache.org/repos/asf/spark/blob/37719e0c/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
index b0f983c..83881a3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
@@ -17,10 +17,8 @@
 
 package org.apache.spark.sql.columnar
 
-import java.sql.Timestamp
-
 import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.expressions.{AttributeMap, Attribute, 
AttributeReference}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
AttributeReference}
 import org.apache.spark.sql.types._
 
 private[sql] class ColumnStatisticsSchema(a: Attribute) extends Serializable {
@@ -234,22 +232,7 @@ private[sql] class StringColumnStats extends ColumnStats {
 
 private[sql] class DateColumnStats extends IntColumnStats
 
-private[sql] class TimestampColumnStats extends ColumnStats {
-  protected var upper: Timestamp = null
-  protected var lower: Timestamp = null
-
-  override def gatherStats(row: Row, ordinal: Int): Unit = {
-    super.gatherStats(row, ordinal)
-    if (!row.isNullAt(ordinal)) {
-      val value = row(ordinal).asInstanceOf[Timestamp]
-      if (upper == null || value.compareTo(upper) > 0) upper = value
-      if (lower == null || value.compareTo(lower) < 0) lower = value
-      sizeInBytes += TIMESTAMP.defaultSize
-    }
-  }
-
-  override def collectedStatistics: Row = Row(lower, upper, nullCount, count, 
sizeInBytes)
-}
+private[sql] class TimestampColumnStats extends LongColumnStats
 
 private[sql] class BinaryColumnStats extends ColumnStats {
   override def gatherStats(row: Row, ordinal: Int): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/37719e0c/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
index 20be5ca..c9c4d63 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.sql.columnar
 
 import java.nio.ByteBuffer
-import java.sql.Timestamp
 
 import scala.reflect.runtime.universe.TypeTag
 
@@ -355,22 +354,20 @@ private[sql] object DATE extends 
NativeColumnType(DateType, 8, 4) {
   }
 }
 
-private[sql] object TIMESTAMP extends NativeColumnType(TimestampType, 9, 12) {
-  override def extract(buffer: ByteBuffer): Timestamp = {
-    val timestamp = new Timestamp(buffer.getLong())
-    timestamp.setNanos(buffer.getInt())
-    timestamp
+private[sql] object TIMESTAMP extends NativeColumnType(TimestampType, 9, 8) {
+  override def extract(buffer: ByteBuffer): Long = {
+    buffer.getLong
   }
 
-  override def append(v: Timestamp, buffer: ByteBuffer): Unit = {
-    buffer.putLong(v.getTime).putInt(v.getNanos)
+  override def append(v: Long, buffer: ByteBuffer): Unit = {
+    buffer.putLong(v)
   }
 
-  override def getField(row: Row, ordinal: Int): Timestamp = {
-    row(ordinal).asInstanceOf[Timestamp]
+  override def getField(row: Row, ordinal: Int): Long = {
+    row(ordinal).asInstanceOf[Long]
   }
 
-  override def setField(row: MutableRow, ordinal: Int, value: Timestamp): Unit 
= {
+  override def setField(row: MutableRow, ordinal: Int, value: Long): Unit = {
     row(ordinal) = value
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/37719e0c/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
index 256d527..60f3b2d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
@@ -20,14 +20,13 @@ package org.apache.spark.sql.execution
 import java.io._
 import java.math.{BigDecimal, BigInteger}
 import java.nio.ByteBuffer
-import java.sql.Timestamp
 
 import scala.reflect.ClassTag
 
-import org.apache.spark.serializer._
 import org.apache.spark.Logging
+import org.apache.spark.serializer._
 import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.expressions.{SpecificMutableRow, 
MutableRow, GenericMutableRow}
+import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, 
MutableRow, SpecificMutableRow}
 import org.apache.spark.sql.types._
 
 /**
@@ -304,11 +303,7 @@ private[sql] object SparkSqlSerializer2 {
                 out.writeByte(NULL)
               } else {
                 out.writeByte(NOT_NULL)
-                val timestamp = row.getAs[java.sql.Timestamp](i)
-                val time = timestamp.getTime
-                val nanos = timestamp.getNanos
-                out.writeLong(time - (nanos / 1000000)) // Write the 
milliseconds value.
-                out.writeInt(nanos)                     // Write the 
nanoseconds part.
+                out.writeLong(row.getAs[Long](i))
               }
 
             case StringType =>
@@ -429,11 +424,7 @@ private[sql] object SparkSqlSerializer2 {
               if (in.readByte() == NULL) {
                 mutableRow.setNullAt(i)
               } else {
-                val time = in.readLong() // Read the milliseconds value.
-                val nanos = in.readInt() // Read the nanoseconds part.
-                val timestamp = new Timestamp(time)
-                timestamp.setNanos(nanos)
-                mutableRow.update(i, timestamp)
+                mutableRow.update(i, in.readLong())
               }
 
             case StringType =>

http://git-wip-us.apache.org/repos/asf/spark/blob/37719e0c/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
index dffb265..720b529 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
@@ -170,6 +170,8 @@ package object debug {
       case (_: Short, ShortType) =>
       case (_: Boolean, BooleanType) =>
       case (_: Double, DoubleType) =>
+      case (_: Int, DateType) =>
+      case (_: Long, TimestampType) =>
       case (v, udt: UserDefinedType[_]) => typeCheck(v, udt.sqlType)
 
       case (d, t) => sys.error(s"Invalid data found: got $d (${d.getClass}) 
expected $t")

http://git-wip-us.apache.org/repos/asf/spark/blob/37719e0c/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
index 3425879..955b478 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
@@ -148,6 +148,7 @@ object EvaluatePython {
     case (ud, udt: UserDefinedType[_]) => toJava(udt.serialize(ud), 
udt.sqlType)
 
     case (date: Int, DateType) => DateUtils.toJavaDate(date)
+    case (t: Long, TimestampType) => DateUtils.toJavaTimestamp(t)
     case (s: UTF8String, StringType) => s.toString
 
     // Pyrolite can handle Timestamp and Decimal
@@ -186,10 +187,12 @@ object EvaluatePython {
       }): Row
 
     case (c: java.util.Calendar, DateType) =>
-      DateUtils.fromJavaDate(new java.sql.Date(c.getTime().getTime()))
+      DateUtils.fromJavaDate(new java.sql.Date(c.getTimeInMillis))
 
     case (c: java.util.Calendar, TimestampType) =>
-      new java.sql.Timestamp(c.getTime().getTime())
+      c.getTimeInMillis * 10000L
+    case (t: java.sql.Timestamp, TimestampType) =>
+      DateUtils.fromJavaTimestamp(t)
 
     case (_, udt: UserDefinedType[_]) =>
       fromJava(obj, udt.sqlType)

http://git-wip-us.apache.org/repos/asf/spark/blob/37719e0c/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
index db68b9c..9028d5e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
@@ -385,7 +385,7 @@ private[sql] class JDBCRDD(
               // DateUtils.fromJavaDate does not handle null value, so we need 
to check it.
               val dateVal = rs.getDate(pos)
               if (dateVal != null) {
-                mutableRow.update(i, DateUtils.fromJavaDate(dateVal))
+                mutableRow.setInt(i, DateUtils.fromJavaDate(dateVal))
               } else {
                 mutableRow.update(i, null)
               }
@@ -417,7 +417,13 @@ private[sql] class JDBCRDD(
             case LongConversion => mutableRow.setLong(i, rs.getLong(pos))
             // TODO(davies): use getBytes for better performance, if the 
encoding is UTF-8
             case StringConversion => mutableRow.setString(i, rs.getString(pos))
-            case TimestampConversion => mutableRow.update(i, 
rs.getTimestamp(pos))
+            case TimestampConversion =>
+              val t = rs.getTimestamp(pos)
+              if (t != null) {
+                mutableRow.setLong(i, DateUtils.fromJavaTimestamp(t))
+              } else {
+                mutableRow.update(i, null)
+              }
             case BinaryConversion => mutableRow.update(i, rs.getBytes(pos))
             case BinaryLongConversion => {
               val bytes = rs.getBytes(pos)

http://git-wip-us.apache.org/repos/asf/spark/blob/37719e0c/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala
index 0e22375..4e07cf3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.sql.json
 
 import java.io.ByteArrayOutputStream
-import java.sql.Timestamp
 
 import scala.collection.Map
 
@@ -65,10 +64,10 @@ private[sql] object JacksonParser {
         DateUtils.millisToDays(DateUtils.stringToTime(parser.getText).getTime)
 
       case (VALUE_STRING, TimestampType) =>
-        new Timestamp(DateUtils.stringToTime(parser.getText).getTime)
+        DateUtils.stringToTime(parser.getText).getTime * 10000L
 
       case (VALUE_NUMBER_INT, TimestampType) =>
-        new Timestamp(parser.getLongValue)
+        parser.getLongValue * 10000L
 
       case (_, StringType) =>
         val writer = new ByteArrayOutputStream()

http://git-wip-us.apache.org/repos/asf/spark/blob/37719e0c/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
index 7e1e21f..fb0d137 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.json
 
-import java.sql.Timestamp
-
 import scala.collection.Map
 import scala.collection.convert.Wrappers.{JListWrapper, JMapWrapper}
 
@@ -398,11 +396,11 @@ private[sql] object JsonRDD extends Logging {
     }
   }
 
-  private def toTimestamp(value: Any): Timestamp = {
+  private def toTimestamp(value: Any): Long = {
     value match {
-      case value: java.lang.Integer => new 
Timestamp(value.asInstanceOf[Int].toLong)
-      case value: java.lang.Long => new Timestamp(value)
-      case value: java.lang.String => 
toTimestamp(DateUtils.stringToTime(value).getTime)
+      case value: java.lang.Integer => value.asInstanceOf[Int].toLong * 10000L
+      case value: java.lang.Long => value * 10000L
+      case value: java.lang.String => DateUtils.stringToTime(value).getTime * 
10000L
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/37719e0c/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
index 85c2ce7..ddc5097 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
@@ -28,6 +28,7 @@ import org.apache.parquet.io.api.{PrimitiveConverter, 
GroupConverter, Binary, Co
 import org.apache.parquet.schema.MessageType
 
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.util.DateUtils
 import org.apache.spark.sql.parquet.CatalystConverter.FieldType
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.parquet.timestamp.NanoTime
@@ -266,8 +267,8 @@ private[parquet] abstract class CatalystConverter extends 
GroupConverter {
   /**
    * Read a Timestamp value from a Parquet Int96Value
    */
-  protected[parquet] def readTimestamp(value: Binary): Timestamp = {
-    CatalystTimestampConverter.convertToTimestamp(value)
+  protected[parquet] def readTimestamp(value: Binary): Long = {
+    
DateUtils.fromJavaTimestamp(CatalystTimestampConverter.convertToTimestamp(value))
   }
 }
 
@@ -401,7 +402,7 @@ private[parquet] class CatalystPrimitiveRowConverter(
     current.setInt(fieldIndex, value)
 
   override protected[parquet] def updateDate(fieldIndex: Int, value: Int): 
Unit =
-    current.update(fieldIndex, value)
+    current.setInt(fieldIndex, value)
 
   override protected[parquet] def updateLong(fieldIndex: Int, value: Long): 
Unit =
     current.setLong(fieldIndex, value)
@@ -425,7 +426,7 @@ private[parquet] class CatalystPrimitiveRowConverter(
     current.update(fieldIndex, UTF8String(value))
 
   override protected[parquet] def updateTimestamp(fieldIndex: Int, value: 
Binary): Unit =
-    current.update(fieldIndex, readTimestamp(value))
+    current.setLong(fieldIndex, readTimestamp(value))
 
   override protected[parquet] def updateDecimal(
       fieldIndex: Int, value: Binary, ctype: DecimalType): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/37719e0c/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
index 89db408..e03dbde 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
@@ -29,6 +29,7 @@ import org.apache.parquet.schema.MessageType
 
 import org.apache.spark.Logging
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Row}
+import org.apache.spark.sql.catalyst.util.DateUtils
 import org.apache.spark.sql.types._
 
 /**
@@ -204,7 +205,7 @@ private[parquet] class RowWriteSupport extends 
WriteSupport[Row] with Logging {
         case IntegerType => writer.addInteger(value.asInstanceOf[Int])
         case ShortType => writer.addInteger(value.asInstanceOf[Short])
         case LongType => writer.addLong(value.asInstanceOf[Long])
-        case TimestampType => 
writeTimestamp(value.asInstanceOf[java.sql.Timestamp])
+        case TimestampType => writeTimestamp(value.asInstanceOf[Long])
         case ByteType => writer.addInteger(value.asInstanceOf[Byte])
         case DoubleType => writer.addDouble(value.asInstanceOf[Double])
         case FloatType => writer.addFloat(value.asInstanceOf[Float])
@@ -311,8 +312,9 @@ private[parquet] class RowWriteSupport extends 
WriteSupport[Row] with Logging {
     writer.addBinary(Binary.fromByteArray(scratchBytes, 0, numBytes))
   }
 
-  private[parquet] def writeTimestamp(ts: java.sql.Timestamp): Unit = {
-    val binaryNanoTime = CatalystTimestampConverter.convertFromTimestamp(ts)
+  private[parquet] def writeTimestamp(ts: Long): Unit = {
+    val binaryNanoTime = CatalystTimestampConverter.convertFromTimestamp(
+      DateUtils.toJavaTimestamp(ts))
     writer.addBinary(binaryNanoTime)
   }
 }
@@ -357,7 +359,7 @@ private[parquet] class MutableRowWriteSupport extends 
RowWriteSupport {
       case FloatType => writer.addFloat(record.getFloat(index))
       case BooleanType => writer.addBoolean(record.getBoolean(index))
       case DateType => writer.addInteger(record.getInt(index))
-      case TimestampType => 
writeTimestamp(record(index).asInstanceOf[java.sql.Timestamp])
+      case TimestampType => writeTimestamp(record(index).asInstanceOf[Long])
       case d: DecimalType =>
         if (d.precisionInfo == None || d.precisionInfo.get.precision > 18) {
           sys.error(s"Unsupported datatype $d, cannot write to consumer")

http://git-wip-us.apache.org/repos/asf/spark/blob/37719e0c/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 72e60d9..17a3cec 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -25,7 +25,7 @@ import org.scalatest.concurrent.Eventually._
 import org.apache.spark.Accumulators
 import org.apache.spark.sql.TestData._
 import org.apache.spark.sql.columnar._
-import org.apache.spark.storage.{RDDBlockId, StorageLevel}
+import org.apache.spark.storage.{StorageLevel, RDDBlockId}
 
 case class BigData(s: String)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/37719e0c/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala
index 339e719..1683662 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala
@@ -31,7 +31,7 @@ class ColumnStatsSuite extends SparkFunSuite {
   testColumnStats(classOf[FixedDecimalColumnStats], FIXED_DECIMAL(15, 10), 
Row(null, null, 0))
   testColumnStats(classOf[StringColumnStats], STRING, Row(null, null, 0))
   testColumnStats(classOf[DateColumnStats], DATE, Row(Int.MaxValue, 
Int.MinValue, 0))
-  testColumnStats(classOf[TimestampColumnStats], TIMESTAMP, Row(null, null, 0))
+  testColumnStats(classOf[TimestampColumnStats], TIMESTAMP, Row(Long.MaxValue, 
Long.MinValue, 0))
 
   def testColumnStats[T <: AtomicType, U <: ColumnStats](
       columnStatsClass: Class[U],

http://git-wip-us.apache.org/repos/asf/spark/blob/37719e0c/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
index a1e76ea..8421e67 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
@@ -18,17 +18,16 @@
 package org.apache.spark.sql.columnar
 
 import java.nio.ByteBuffer
-import java.sql.Timestamp
 
-import com.esotericsoftware.kryo.{Serializer, Kryo}
 import com.esotericsoftware.kryo.io.{Input, Output}
-import org.apache.spark.serializer.KryoRegistrator
+import com.esotericsoftware.kryo.{Kryo, Serializer}
 
-import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
+import org.apache.spark.serializer.KryoRegistrator
 import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
 import org.apache.spark.sql.columnar.ColumnarTestUtils._
 import org.apache.spark.sql.execution.SparkSqlSerializer
 import org.apache.spark.sql.types._
+import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
 
 class ColumnTypeSuite extends SparkFunSuite with Logging {
   val DEFAULT_BUFFER_SIZE = 512
@@ -36,7 +35,7 @@ class ColumnTypeSuite extends SparkFunSuite with Logging {
   test("defaultSize") {
     val checks = Map(
       INT -> 4, SHORT -> 2, LONG -> 8, BYTE -> 1, DOUBLE -> 8, FLOAT -> 4,
-      FIXED_DECIMAL(15, 10) -> 8, BOOLEAN -> 1, STRING -> 8, DATE -> 4, 
TIMESTAMP -> 12,
+      FIXED_DECIMAL(15, 10) -> 8, BOOLEAN -> 1, STRING -> 8, DATE -> 4, 
TIMESTAMP -> 8,
       BINARY -> 16, GENERIC -> 16)
 
     checks.foreach { case (columnType, expectedSize) =>
@@ -69,7 +68,7 @@ class ColumnTypeSuite extends SparkFunSuite with Logging {
     checkActualSize(BOOLEAN, true, 1)
     checkActualSize(STRING, UTF8String("hello"), 4 + 
"hello".getBytes("utf-8").length)
     checkActualSize(DATE, 0, 4)
-    checkActualSize(TIMESTAMP, new Timestamp(0L), 12)
+    checkActualSize(TIMESTAMP, 0L, 8)
 
     val binary = Array.fill[Byte](4)(0: Byte)
     checkActualSize(BINARY, binary, 4 + 4)

http://git-wip-us.apache.org/repos/asf/spark/blob/37719e0c/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala
index 75d993e..c5d3859 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala
@@ -17,14 +17,12 @@
 
 package org.apache.spark.sql.columnar
 
-import java.sql.Timestamp
-
 import scala.collection.immutable.HashSet
 import scala.util.Random
 
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
-import org.apache.spark.sql.types.{UTF8String, DataType, Decimal, AtomicType}
+import org.apache.spark.sql.types.{AtomicType, DataType, Decimal, UTF8String}
 
 object ColumnarTestUtils {
   def makeNullRow(length: Int): GenericMutableRow = {
@@ -52,10 +50,7 @@ object ColumnarTestUtils {
       case BOOLEAN => Random.nextBoolean()
       case BINARY => randomBytes(Random.nextInt(32))
       case DATE => Random.nextInt()
-      case TIMESTAMP =>
-        val timestamp = new Timestamp(Random.nextLong())
-        timestamp.setNanos(Random.nextInt(999999999))
-        timestamp
+      case TIMESTAMP => Random.nextLong()
       case _ =>
         // Using a random one-element map instead of an arbitrary object
         Map(Random.nextInt() -> Random.nextString(Random.nextInt(32)))

http://git-wip-us.apache.org/repos/asf/spark/blob/37719e0c/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 49d348c..69ab1c2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -326,7 +326,7 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter {
     assert(cal.get(Calendar.HOUR) === 11)
     assert(cal.get(Calendar.MINUTE) === 22)
     assert(cal.get(Calendar.SECOND) === 33)
-    assert(rows(0).getAs[java.sql.Timestamp](2).getNanos === 543543543)
+    assert(rows(0).getAs[java.sql.Timestamp](2).getNanos === 543543500)
   }
 
   test("test DATE types") {

http://git-wip-us.apache.org/repos/asf/spark/blob/37719e0c/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
index d889c7b..fca2436 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
@@ -76,21 +76,25 @@ class JsonSuite extends QueryTest with TestJsonData {
     checkTypePromotion(
       Decimal(doubleNumber), enforceCorrectType(doubleNumber, 
DecimalType.Unlimited))
 
-    checkTypePromotion(new Timestamp(intNumber), enforceCorrectType(intNumber, 
TimestampType))
-    checkTypePromotion(new Timestamp(intNumber.toLong),
+    checkTypePromotion(DateUtils.fromJavaTimestamp(new Timestamp(intNumber)),
+        enforceCorrectType(intNumber, TimestampType))
+    checkTypePromotion(DateUtils.fromJavaTimestamp(new 
Timestamp(intNumber.toLong)),
         enforceCorrectType(intNumber.toLong, TimestampType))
     val strTime = "2014-09-30 12:34:56"
-    checkTypePromotion(Timestamp.valueOf(strTime), enforceCorrectType(strTime, 
TimestampType))
+    checkTypePromotion(DateUtils.fromJavaTimestamp(Timestamp.valueOf(strTime)),
+        enforceCorrectType(strTime, TimestampType))
 
     val strDate = "2014-10-15"
     checkTypePromotion(
       DateUtils.fromJavaDate(Date.valueOf(strDate)), 
enforceCorrectType(strDate, DateType))
 
     val ISO8601Time1 = "1970-01-01T01:00:01.0Z"
-    checkTypePromotion(new Timestamp(3601000), 
enforceCorrectType(ISO8601Time1, TimestampType))
+    checkTypePromotion(DateUtils.fromJavaTimestamp(new Timestamp(3601000)),
+        enforceCorrectType(ISO8601Time1, TimestampType))
     checkTypePromotion(DateUtils.millisToDays(3601000), 
enforceCorrectType(ISO8601Time1, DateType))
     val ISO8601Time2 = "1970-01-01T02:00:01-01:00"
-    checkTypePromotion(new Timestamp(10801000), 
enforceCorrectType(ISO8601Time2, TimestampType))
+    checkTypePromotion(DateUtils.fromJavaTimestamp(new Timestamp(10801000)),
+        enforceCorrectType(ISO8601Time2, TimestampType))
     checkTypePromotion(DateUtils.millisToDays(10801000), 
enforceCorrectType(ISO8601Time2, DateType))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/37719e0c/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
 
b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index 0693c7e..82c0b49 100644
--- 
a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ 
b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -252,7 +252,11 @@ class HiveCompatibilitySuite extends HiveQueryFileTest 
with BeforeAndAfter {
     "load_dyn_part14.*", // These work alone but fail when run with other 
tests...
 
     // the answer is sensitive for jdk version
-    "udf_java_method"
+    "udf_java_method",
+
+    // Spark SQL use Long for TimestampType, lose the precision under 100ns
+    "timestamp_1",
+    "timestamp_2"
   )
 
   /**
@@ -795,8 +799,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with 
BeforeAndAfter {
     "stats_publisher_error_1",
     "subq2",
     "tablename_with_select",
-    "timestamp_1",
-    "timestamp_2",
     "timestamp_3",
     "timestamp_comparison",
     "timestamp_lazy",

http://git-wip-us.apache.org/repos/asf/spark/blob/37719e0c/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
index c466203..1f14cba 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
@@ -250,7 +250,8 @@ private[hive] trait HiveInspectors {
         PrimitiveObjectInspectorFactory.javaHiveDecimalObjectInspector,
         poi.getWritableConstantValue.getHiveDecimal)
     case poi: WritableConstantTimestampObjectInspector =>
-      poi.getWritableConstantValue.getTimestamp.clone()
+      val t = poi.getWritableConstantValue
+      t.getSeconds * 10000000L + t.getNanos / 100L
     case poi: WritableConstantIntObjectInspector =>
       poi.getWritableConstantValue.get()
     case poi: WritableConstantDoubleObjectInspector =>
@@ -313,11 +314,11 @@ private[hive] trait HiveInspectors {
       case x: DateObjectInspector if x.preferWritable() =>
         DateUtils.fromJavaDate(x.getPrimitiveWritableObject(data).get())
       case x: DateObjectInspector => 
DateUtils.fromJavaDate(x.getPrimitiveJavaObject(data))
-      // org.apache.hadoop.hive.serde2.io.TimestampWritable.set will reset 
current time object
-      // if next timestamp is null, so Timestamp object is cloned
       case x: TimestampObjectInspector if x.preferWritable() =>
-        x.getPrimitiveWritableObject(data).getTimestamp.clone()
-      case ti: TimestampObjectInspector => 
ti.getPrimitiveJavaObject(data).clone()
+        val t = x.getPrimitiveWritableObject(data)
+        t.getSeconds * 10000000L + t.getNanos / 100
+      case ti: TimestampObjectInspector =>
+        DateUtils.fromJavaTimestamp(ti.getPrimitiveJavaObject(data))
       case _ => pi.getPrimitiveJavaObject(data)
     }
     case li: ListObjectInspector =>
@@ -356,6 +357,9 @@ private[hive] trait HiveInspectors {
     case _: JavaDateObjectInspector =>
       (o: Any) => DateUtils.toJavaDate(o.asInstanceOf[Int])
 
+    case _: JavaTimestampObjectInspector =>
+      (o: Any) => DateUtils.toJavaTimestamp(o.asInstanceOf[Long])
+
     case soi: StandardStructObjectInspector =>
       val wrappers = soi.getAllStructFieldRefs.map(ref => 
wrapperFor(ref.getFieldObjectInspector))
       (o: Any) => {
@@ -465,7 +469,7 @@ private[hive] trait HiveInspectors {
       case _: DateObjectInspector if x.preferWritable() => getDateWritable(a)
       case _: DateObjectInspector => DateUtils.toJavaDate(a.asInstanceOf[Int])
       case _: TimestampObjectInspector if x.preferWritable() => 
getTimestampWritable(a)
-      case _: TimestampObjectInspector => a.asInstanceOf[java.sql.Timestamp]
+      case _: TimestampObjectInspector => 
DateUtils.toJavaTimestamp(a.asInstanceOf[Long])
     }
     case x: SettableStructObjectInspector =>
       val fieldRefs = x.getAllStructFieldRefs
@@ -727,7 +731,7 @@ private[hive] trait HiveInspectors {
       TypeInfoFactory.voidTypeInfo, null)
 
   private def getStringWritable(value: Any): hadoopIo.Text =
-    if (value == null) null else new 
hadoopIo.Text(value.asInstanceOf[UTF8String].toString)
+    if (value == null) null else new 
hadoopIo.Text(value.asInstanceOf[UTF8String].getBytes)
 
   private def getIntWritable(value: Any): hadoopIo.IntWritable =
     if (value == null) null else new 
hadoopIo.IntWritable(value.asInstanceOf[Int])
@@ -776,7 +780,7 @@ private[hive] trait HiveInspectors {
     if (value == null) {
       null
     } else {
-      new hiveIo.TimestampWritable(value.asInstanceOf[java.sql.Timestamp])
+      new 
hiveIo.TimestampWritable(DateUtils.toJavaTimestamp(value.asInstanceOf[Long]))
     }
 
   private def getDecimalWritable(value: Any): hiveIo.HiveDecimalWritable =

http://git-wip-us.apache.org/repos/asf/spark/blob/37719e0c/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index 334bfcc..d3c82d8 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -363,10 +363,10 @@ private[hive] object HadoopTableReader extends 
HiveInspectors with Logging {
             row.update(ordinal, HiveShim.toCatalystDecimal(oi, value))
         case oi: TimestampObjectInspector =>
           (value: Any, row: MutableRow, ordinal: Int) =>
-            row.update(ordinal, oi.getPrimitiveJavaObject(value).clone())
+            row.setLong(ordinal, 
DateUtils.fromJavaTimestamp(oi.getPrimitiveJavaObject(value)))
         case oi: DateObjectInspector =>
           (value: Any, row: MutableRow, ordinal: Int) =>
-            row.update(ordinal, 
DateUtils.fromJavaDate(oi.getPrimitiveJavaObject(value)))
+            row.setInt(ordinal, 
DateUtils.fromJavaDate(oi.getPrimitiveJavaObject(value)))
         case oi: BinaryObjectInspector =>
           (value: Any, row: MutableRow, ordinal: Int) =>
             row.update(ordinal, oi.getPrimitiveJavaObject(value))

http://git-wip-us.apache.org/repos/asf/spark/blob/37719e0c/sql/hive/src/test/resources/golden/timestamp
 cast #5-0-dbd7bcd167d322d6617b884c02c7f247
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/resources/golden/timestamp cast 
#5-0-dbd7bcd167d322d6617b884c02c7f247 
b/sql/hive/src/test/resources/golden/timestamp cast 
#5-0-dbd7bcd167d322d6617b884c02c7f247
index 27de46f..84a31a5 100644
--- a/sql/hive/src/test/resources/golden/timestamp cast 
#5-0-dbd7bcd167d322d6617b884c02c7f247   
+++ b/sql/hive/src/test/resources/golden/timestamp cast 
#5-0-dbd7bcd167d322d6617b884c02c7f247   
@@ -1 +1 @@
--0.0010000000000000009
+-0.001


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to