HIVE-19853: Arrow serializer needs to create a TimeStampMicroTZVector instead of TimeStampMicroVector (Teddy Choi, reviewed by Matt McCline)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1a610cc5 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1a610cc5 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1a610cc5 Branch: refs/heads/master-txnstats Commit: 1a610cc545d39b9e9116c5b90108197853d0364c Parents: c4eb647 Author: Matt McCline <mmccl...@hortonworks.com> Authored: Mon Jun 18 15:55:00 2018 -0500 Committer: Matt McCline <mmccl...@hortonworks.com> Committed: Mon Jun 18 15:55:00 2018 -0500 ---------------------------------------------------------------------- .../hadoop/hive/ql/io/arrow/Deserializer.java | 94 +++++++------------- .../hadoop/hive/ql/io/arrow/Serializer.java | 15 ++-- 2 files changed, 40 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/1a610cc5/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Deserializer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Deserializer.java b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Deserializer.java index 6e09d39..edc4b39 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Deserializer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Deserializer.java @@ -29,9 +29,7 @@ import org.apache.arrow.vector.IntVector; import org.apache.arrow.vector.IntervalDayVector; import org.apache.arrow.vector.IntervalYearVector; import org.apache.arrow.vector.SmallIntVector; -import org.apache.arrow.vector.TimeStampMicroVector; -import org.apache.arrow.vector.TimeStampMilliVector; -import org.apache.arrow.vector.TimeStampNanoVector; +import org.apache.arrow.vector.TimeStampVector; import org.apache.arrow.vector.TinyIntVector; import org.apache.arrow.vector.VarBinaryVector; import org.apache.arrow.vector.VarCharVector; @@ -268,35 +266,11 @@ class Deserializer { } break; case TIMESTAMPMILLI: - { - for (int i = 0; i < size; i++) { - if (arrowVector.isNull(i)) { - VectorizedBatchUtil.setNullColIsNullValue(hiveVector, i); - } else { - hiveVector.isNull[i] = false; - - // Time = second + sub-second - final long timeInMillis = ((TimeStampMilliVector) arrowVector).get(i); - final TimestampColumnVector timestampColumnVector = (TimestampColumnVector) hiveVector; - int subSecondInNanos = (int) ((timeInMillis % MILLIS_PER_SECOND) * NS_PER_MILLIS); - long second = timeInMillis / MILLIS_PER_SECOND; - - // A nanosecond value should not be negative - if (subSecondInNanos < 0) { - - // So add one second to the negative nanosecond value to make it positive - subSecondInNanos += NS_PER_SECOND; - - // Subtract one second from the second value because we added one second - second -= 1; - } - timestampColumnVector.time[i] = second * MILLIS_PER_SECOND; - timestampColumnVector.nanos[i] = subSecondInNanos; - } - } - } - break; + case TIMESTAMPMILLITZ: case TIMESTAMPMICRO: + case TIMESTAMPMICROTZ: + case TIMESTAMPNANO: + case TIMESTAMPNANOTZ: { for (int i = 0; i < size; i++) { if (arrowVector.isNull(i)) { @@ -305,40 +279,36 @@ class Deserializer { hiveVector.isNull[i] = false; // Time = second + sub-second - final long timeInMicros = ((TimeStampMicroVector) arrowVector).get(i); - final TimestampColumnVector timestampColumnVector = (TimestampColumnVector) hiveVector; - int subSecondInNanos = (int) ((timeInMicros % MICROS_PER_SECOND) * NS_PER_MICROS); - long second = timeInMicros / MICROS_PER_SECOND; - - // A nanosecond value should not be negative - if (subSecondInNanos < 0) { - - // So add one second to the negative nanosecond value to make it positive - subSecondInNanos += NS_PER_SECOND; - - // Subtract one second from the second value because we added one second - second -= 1; + final long time = ((TimeStampVector) arrowVector).get(i); + long second; + int subSecondInNanos; + switch (minorType) { + case TIMESTAMPMILLI: + case TIMESTAMPMILLITZ: + { + subSecondInNanos = (int) ((time % MILLIS_PER_SECOND) * NS_PER_MILLIS); + second = time / MILLIS_PER_SECOND; + } + break; + case TIMESTAMPMICROTZ: + case TIMESTAMPMICRO: + { + subSecondInNanos = (int) ((time % MICROS_PER_SECOND) * NS_PER_MICROS); + second = time / MICROS_PER_SECOND; + } + break; + case TIMESTAMPNANOTZ: + case TIMESTAMPNANO: + { + subSecondInNanos = (int) (time % NS_PER_SECOND); + second = time / NS_PER_SECOND; + } + break; + default: + throw new IllegalArgumentException(); } - timestampColumnVector.time[i] = second * MILLIS_PER_SECOND; - timestampColumnVector.nanos[i] = subSecondInNanos; - } - } - } - break; - case TIMESTAMPNANO: - { - for (int i = 0; i < size; i++) { - if (arrowVector.isNull(i)) { - VectorizedBatchUtil.setNullColIsNullValue(hiveVector, i); - } else { - hiveVector.isNull[i] = false; - // Time = second + sub-second - final long timeInNanos = ((TimeStampNanoVector) arrowVector).get(i); final TimestampColumnVector timestampColumnVector = (TimestampColumnVector) hiveVector; - int subSecondInNanos = (int) (timeInNanos % NS_PER_SECOND); - long second = timeInNanos / NS_PER_SECOND; - // A nanosecond value should not be negative if (subSecondInNanos < 0) { http://git-wip-us.apache.org/repos/asf/hive/blob/1a610cc5/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java index e6af916..2961050 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java @@ -30,7 +30,7 @@ import org.apache.arrow.vector.IntVector; import org.apache.arrow.vector.IntervalDayVector; import org.apache.arrow.vector.IntervalYearVector; import org.apache.arrow.vector.SmallIntVector; -import org.apache.arrow.vector.TimeStampMicroVector; +import org.apache.arrow.vector.TimeStampMicroTZVector; import org.apache.arrow.vector.TinyIntVector; import org.apache.arrow.vector.VarBinaryVector; import org.apache.arrow.vector.VarCharVector; @@ -38,6 +38,7 @@ import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.complex.MapVector; import org.apache.arrow.vector.complex.NullableMapVector; +import org.apache.arrow.vector.types.TimeUnit; import org.apache.arrow.vector.types.Types; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.FieldType; @@ -177,8 +178,8 @@ class Serializer { case DATE: return Types.MinorType.DATEDAY.getType(); case TIMESTAMP: - // HIVE-19723: Prefer microsecond because Spark supports it - return Types.MinorType.TIMESTAMPMICRO.getType(); + // HIVE-19853: Prefer timestamp in microsecond with time zone because Spark supports it + return new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC"); case BINARY: return Types.MinorType.VARBINARY.getType(); case DECIMAL: @@ -433,11 +434,11 @@ class Serializer { break; case TIMESTAMP: { - final TimeStampMicroVector timeStampMicroVector = (TimeStampMicroVector) arrowVector; + final TimeStampMicroTZVector timeStampMicroTZVector = (TimeStampMicroTZVector) arrowVector; final TimestampColumnVector timestampColumnVector = (TimestampColumnVector) hiveVector; for (int i = 0; i < size; i++) { if (hiveVector.isNull[i]) { - timeStampMicroVector.setNull(i); + timeStampMicroTZVector.setNull(i); } else { // Time = second + sub-second final long secondInMillis = timestampColumnVector.getTime(i); @@ -446,9 +447,9 @@ class Serializer { if ((secondInMillis > 0 && secondInMicros < 0) || (secondInMillis < 0 && secondInMicros > 0)) { // If the timestamp cannot be represented in long microsecond, set it as a null value - timeStampMicroVector.setNull(i); + timeStampMicroTZVector.setNull(i); } else { - timeStampMicroVector.set(i, secondInMicros + subSecondInMicros); + timeStampMicroTZVector.set(i, secondInMicros + subSecondInMicros); } } }