Repository: spark
Updated Branches:
  refs/heads/master 1b85bcd92 -> ac1ab6b9d


Revert "[SPARK-12297][SQL] Hive compatibility for Parquet Timestamps"

This reverts commit 22691556e5f0dfbac81b8cc9ca0a67c70c1711ca.

See JIRA ticket for more information.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ac1ab6b9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ac1ab6b9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ac1ab6b9

Branch: refs/heads/master
Commit: ac1ab6b9db188ac54c745558d57dd0a031d0b162
Parents: 1b85bcd
Author: Reynold Xin <r...@databricks.com>
Authored: Tue May 9 11:35:59 2017 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Tue May 9 11:35:59 2017 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/catalog/interface.scala  |   4 +-
 .../spark/sql/catalyst/util/DateTimeUtils.scala |   5 -
 .../parquet/VectorizedColumnReader.java         |  28 +-
 .../parquet/VectorizedParquetRecordReader.java  |   6 +-
 .../spark/sql/execution/command/tables.scala    |   8 +-
 .../datasources/parquet/ParquetFileFormat.scala |   2 -
 .../parquet/ParquetReadSupport.scala            |   3 +-
 .../parquet/ParquetRecordMaterializer.scala     |   9 +-
 .../parquet/ParquetRowConverter.scala           |  53 +--
 .../parquet/ParquetWriteSupport.scala           |  25 +-
 .../spark/sql/hive/HiveExternalCatalog.scala    |  11 +-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  12 +-
 .../hive/ParquetHiveCompatibilitySuite.scala    | 379 +------------------
 13 files changed, 29 insertions(+), 516 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ac1ab6b9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index c39017e..cc0cbba 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -132,10 +132,10 @@ case class CatalogTablePartition(
   /**
    * Given the partition schema, returns a row with that schema holding the 
partition values.
    */
-  def toRow(partitionSchema: StructType, defaultTimeZoneId: String): 
InternalRow = {
+  def toRow(partitionSchema: StructType, defaultTimeZondId: String): 
InternalRow = {
     val caseInsensitiveProperties = CaseInsensitiveMap(storage.properties)
     val timeZoneId = caseInsensitiveProperties.getOrElse(
-      DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId)
+      DateTimeUtils.TIMEZONE_OPTION, defaultTimeZondId)
     InternalRow.fromSeq(partitionSchema.map { field =>
       val partValue = if (spec(field.name) == 
ExternalCatalogUtils.DEFAULT_PARTITION_NAME) {
         null

http://git-wip-us.apache.org/repos/asf/spark/blob/ac1ab6b9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index bf596fa..6c1592f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -498,11 +498,6 @@ object DateTimeUtils {
     false
   }
 
-  lazy val validTimezones = TimeZone.getAvailableIDs().toSet
-  def isValidTimezone(timezoneId: String): Boolean = {
-    validTimezones.contains(timezoneId)
-  }
-
   /**
    * Returns the microseconds since year zero (-17999) from microseconds since 
epoch.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/ac1ab6b9/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index dabbc2b..9d641b5 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -18,9 +18,7 @@
 package org.apache.spark.sql.execution.datasources.parquet;
 
 import java.io.IOException;
-import java.util.TimeZone;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.Dictionary;
@@ -32,7 +30,6 @@ import org.apache.parquet.schema.PrimitiveType;
 
 import org.apache.spark.sql.catalyst.util.DateTimeUtils;
 import org.apache.spark.sql.execution.vectorized.ColumnVector;
-import org.apache.spark.sql.internal.SQLConf;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.DecimalType;
 
@@ -93,30 +90,11 @@ public class VectorizedColumnReader {
 
   private final PageReader pageReader;
   private final ColumnDescriptor descriptor;
-  private final TimeZone storageTz;
-  private final TimeZone sessionTz;
 
-  public VectorizedColumnReader(ColumnDescriptor descriptor, PageReader 
pageReader,
-                                Configuration conf)
+  public VectorizedColumnReader(ColumnDescriptor descriptor, PageReader 
pageReader)
       throws IOException {
     this.descriptor = descriptor;
     this.pageReader = pageReader;
-    // If the table has a timezone property, apply the correct conversions.  
See SPARK-12297.
-    // The conf is sometimes null in tests.
-    String sessionTzString =
-        conf == null ? null : conf.get(SQLConf.SESSION_LOCAL_TIMEZONE().key());
-    if (sessionTzString == null || sessionTzString.isEmpty()) {
-      sessionTz = DateTimeUtils.defaultTimeZone();
-    } else {
-      sessionTz = TimeZone.getTimeZone(sessionTzString);
-    }
-    String storageTzString =
-        conf == null ? null : 
conf.get(ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY());
-    if (storageTzString == null || storageTzString.isEmpty()) {
-      storageTz = sessionTz;
-    } else {
-      storageTz = TimeZone.getTimeZone(storageTzString);
-    }
     this.maxDefLevel = descriptor.getMaxDefinitionLevel();
 
     DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
@@ -311,7 +289,7 @@ public class VectorizedColumnReader {
             // TODO: Convert dictionary of Binaries to dictionary of Longs
             if (!column.isNullAt(i)) {
               Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
-              column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v, 
sessionTz, storageTz));
+              column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v));
             }
           }
         } else {
@@ -444,7 +422,7 @@ public class VectorizedColumnReader {
         if (defColumn.readInteger() == maxDefLevel) {
           column.putLong(rowId + i,
               // Read 12 bytes for INT96
-              ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12), 
sessionTz, storageTz));
+              ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12)));
         } else {
           column.putNull(rowId + i);
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/ac1ab6b9/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
index d8974dd..51bdf0f 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.parquet.column.ColumnDescriptor;
@@ -96,8 +95,6 @@ public class VectorizedParquetRecordReader extends 
SpecificParquetRecordReaderBa
    */
   private boolean returnColumnarBatch;
 
-  private Configuration conf;
-
   /**
    * The default config on whether columnarBatch should be offheap.
    */
@@ -110,7 +107,6 @@ public class VectorizedParquetRecordReader extends 
SpecificParquetRecordReaderBa
   public void initialize(InputSplit inputSplit, TaskAttemptContext 
taskAttemptContext)
       throws IOException, InterruptedException, UnsupportedOperationException {
     super.initialize(inputSplit, taskAttemptContext);
-    this.conf = taskAttemptContext.getConfiguration();
     initializeInternal();
   }
 
@@ -281,7 +277,7 @@ public class VectorizedParquetRecordReader extends 
SpecificParquetRecordReaderBa
     for (int i = 0; i < columns.size(); ++i) {
       if (missingColumns[i]) continue;
       columnReaders[i] = new VectorizedColumnReader(columns.get(i),
-          pages.getPageReader(columns.get(i)), conf);
+          pages.getPageReader(columns.get(i)));
     }
     totalCountLoadedSoFar += pages.getRowCount();
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/ac1ab6b9/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 5843c5b..ebf03e1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -26,6 +26,7 @@ import scala.collection.mutable.ArrayBuffer
 import scala.util.control.NonFatal
 import scala.util.Try
 
+import org.apache.commons.lang3.StringEscapeUtils
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
@@ -36,7 +37,7 @@ import 
org.apache.spark.sql.catalyst.catalog.CatalogTableType._
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
 import org.apache.spark.sql.catalyst.util.quoteIdentifier
-import org.apache.spark.sql.execution.datasources.{DataSource, 
PartitioningUtils}
+import org.apache.spark.sql.execution.datasources.{DataSource, FileFormat, 
PartitioningUtils}
 import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
 import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
@@ -73,10 +74,6 @@ case class CreateTableLikeCommand(
       sourceTableDesc.provider
     }
 
-    val properties = sourceTableDesc.properties.filter { case (k, _) =>
-      k == ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY
-    }
-
     // If the location is specified, we create an external table internally.
     // Otherwise create a managed table.
     val tblType = if (location.isEmpty) CatalogTableType.MANAGED else 
CatalogTableType.EXTERNAL
@@ -89,7 +86,6 @@ case class CreateTableLikeCommand(
           locationUri = location.map(CatalogUtils.stringToURI(_))),
         schema = sourceTableDesc.schema,
         provider = newProvider,
-        properties = properties,
         partitionColumnNames = sourceTableDesc.partitionColumnNames,
         bucketSpec = sourceTableDesc.bucketSpec)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ac1ab6b9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 8113768..2f3a2c6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -632,6 +632,4 @@ object ParquetFileFormat extends Logging {
         Failure(cause)
     }.toOption
   }
-
-  val PARQUET_TIMEZONE_TABLE_PROPERTY = "parquet.mr.int96.write.zone"
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ac1ab6b9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
index bf395a0..f1a35dd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
@@ -95,8 +95,7 @@ private[parquet] class ParquetReadSupport extends 
ReadSupport[UnsafeRow] with Lo
     new ParquetRecordMaterializer(
       parquetRequestedSchema,
       ParquetReadSupport.expandUDT(catalystRequestedSchema),
-      new ParquetSchemaConverter(conf),
-      conf)
+      new ParquetSchemaConverter(conf))
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ac1ab6b9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
index df04199..4e49a0d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
-import org.apache.hadoop.conf.Configuration
 import org.apache.parquet.io.api.{GroupConverter, RecordMaterializer}
 import org.apache.parquet.schema.MessageType
 
@@ -30,17 +29,13 @@ import org.apache.spark.sql.types.StructType
  * @param parquetSchema Parquet schema of the records to be read
  * @param catalystSchema Catalyst schema of the rows to be constructed
  * @param schemaConverter A Parquet-Catalyst schema converter that helps 
initializing row converters
- * @param hadoopConf hadoop Configuration for passing extra params for parquet 
conversion
  */
 private[parquet] class ParquetRecordMaterializer(
-    parquetSchema: MessageType,
-    catalystSchema: StructType,
-    schemaConverter: ParquetSchemaConverter,
-    hadoopConf: Configuration)
+    parquetSchema: MessageType, catalystSchema: StructType, schemaConverter: 
ParquetSchemaConverter)
   extends RecordMaterializer[UnsafeRow] {
 
   private val rootConverter =
-    new ParquetRowConverter(schemaConverter, parquetSchema, catalystSchema, 
hadoopConf, NoopUpdater)
+    new ParquetRowConverter(schemaConverter, parquetSchema, catalystSchema, 
NoopUpdater)
 
   override def getCurrentRecord: UnsafeRow = rootConverter.currentRecord
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ac1ab6b9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index d52ff62..32e6c60 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -19,12 +19,10 @@ package org.apache.spark.sql.execution.datasources.parquet
 
 import java.math.{BigDecimal, BigInteger}
 import java.nio.ByteOrder
-import java.util.TimeZone
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
-import org.apache.hadoop.conf.Configuration
 import org.apache.parquet.column.Dictionary
 import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, 
PrimitiveConverter}
 import org.apache.parquet.schema.{GroupType, MessageType, OriginalType, Type}
@@ -36,7 +34,6 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, 
GenericArrayData}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp
-import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -120,14 +117,12 @@ private[parquet] class ParquetPrimitiveConverter(val 
updater: ParentContainerUpd
  * @param parquetType Parquet schema of Parquet records
  * @param catalystType Spark SQL schema that corresponds to the Parquet record 
type. User-defined
  *        types should have been expanded.
- * @param hadoopConf a hadoop Configuration for passing any extra parameters 
for parquet conversion
  * @param updater An updater which propagates converted field values to the 
parent container
  */
 private[parquet] class ParquetRowConverter(
     schemaConverter: ParquetSchemaConverter,
     parquetType: GroupType,
     catalystType: StructType,
-    hadoopConf: Configuration,
     updater: ParentContainerUpdater)
   extends ParquetGroupConverter(updater) with Logging {
 
@@ -266,18 +261,18 @@ private[parquet] class ParquetRowConverter(
 
       case TimestampType =>
         // TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that.
-        // If the table has a timezone property, apply the correct 
conversions.  See SPARK-12297.
-        val sessionTzString = 
hadoopConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)
-        val sessionTz = Option(sessionTzString).map(TimeZone.getTimeZone(_))
-          .getOrElse(DateTimeUtils.defaultTimeZone())
-        val storageTzString = 
hadoopConf.get(ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY)
-        val storageTz = 
Option(storageTzString).map(TimeZone.getTimeZone(_)).getOrElse(sessionTz)
         new ParquetPrimitiveConverter(updater) {
           // Converts nanosecond timestamps stored as INT96
           override def addBinary(value: Binary): Unit = {
-            val timestamp = ParquetRowConverter.binaryToSQLTimestamp(value, 
sessionTz = sessionTz,
-              storageTz = storageTz)
-            updater.setLong(timestamp)
+            assert(
+              value.length() == 12,
+              "Timestamps (with nanoseconds) are expected to be stored in 
12-byte long binaries, " +
+              s"but got a ${value.length()}-byte binary.")
+
+            val buf = value.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN)
+            val timeOfDayNanos = buf.getLong
+            val julianDay = buf.getInt
+            updater.setLong(DateTimeUtils.fromJulianDay(julianDay, 
timeOfDayNanos))
           }
         }
 
@@ -307,7 +302,7 @@ private[parquet] class ParquetRowConverter(
 
       case t: StructType =>
         new ParquetRowConverter(
-          schemaConverter, parquetType.asGroupType(), t, hadoopConf, new 
ParentContainerUpdater {
+          schemaConverter, parquetType.asGroupType(), t, new 
ParentContainerUpdater {
             override def set(value: Any): Unit = 
updater.set(value.asInstanceOf[InternalRow].copy())
           })
 
@@ -656,7 +651,6 @@ private[parquet] class ParquetRowConverter(
 }
 
 private[parquet] object ParquetRowConverter {
-
   def binaryToUnscaledLong(binary: Binary): Long = {
     // The underlying `ByteBuffer` implementation is guaranteed to be 
`HeapByteBuffer`, so here
     // we are using `Binary.toByteBuffer.array()` to steal the underlying byte 
array without
@@ -679,35 +673,12 @@ private[parquet] object ParquetRowConverter {
     unscaled
   }
 
-  /**
-   * Converts an int96 to a SQLTimestamp, given both the storage timezone and 
the local timezone.
-   * The timestamp is really meant to be interpreted as a "floating time", but 
since we
-   * actually store it as micros since epoch, why we have to apply a 
conversion when timezones
-   * change.
-   *
-   * @param binary a parquet Binary which holds one int96
-   * @param sessionTz the session timezone.  This will be used to determine 
how to display the time,
-    *                  and compute functions on the timestamp which involve a 
timezone, eg. extract
-    *                  the hour.
-   * @param storageTz the timezone which was used to store the timestamp.  
This should come from the
-    *                  timestamp table property, or else assume its the same 
as the sessionTz
-   * @return a timestamp (millis since epoch) which will render correctly in 
the sessionTz
-   */
-  def binaryToSQLTimestamp(
-      binary: Binary,
-      sessionTz: TimeZone,
-      storageTz: TimeZone): SQLTimestamp = {
+  def binaryToSQLTimestamp(binary: Binary): SQLTimestamp = {
     assert(binary.length() == 12, s"Timestamps (with nanoseconds) are expected 
to be stored in" +
       s" 12-byte long binaries. Found a ${binary.length()}-byte binary 
instead.")
     val buffer = binary.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN)
     val timeOfDayNanos = buffer.getLong
     val julianDay = buffer.getInt
-    val utcEpochMicros = DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos)
-    // avoid expensive time logic if possible.
-    if (sessionTz.getID() != storageTz.getID()) {
-      DateTimeUtils.convertTz(utcEpochMicros, sessionTz, storageTz)
-    } else {
-      utcEpochMicros
-    }
+    DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ac1ab6b9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
index 679ed8e..38b0e33 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.datasources.parquet
 
 import java.nio.{ByteBuffer, ByteOrder}
 import java.util
-import java.util.TimeZone
 
 import scala.collection.JavaConverters.mapAsJavaMapConverter
 
@@ -76,9 +75,6 @@ private[parquet] class ParquetWriteSupport extends 
WriteSupport[InternalRow] wit
   // Reusable byte array used to write decimal values
   private val decimalBuffer = new 
Array[Byte](minBytesForPrecision(DecimalType.MAX_PRECISION))
 
-  private var storageTz: TimeZone = _
-  private var sessionTz: TimeZone = _
-
   override def init(configuration: Configuration): WriteContext = {
     val schemaString = configuration.get(ParquetWriteSupport.SPARK_ROW_SCHEMA)
     this.schema = StructType.fromString(schemaString)
@@ -95,19 +91,6 @@ private[parquet] class ParquetWriteSupport extends 
WriteSupport[InternalRow] wit
 
 
     this.rootFieldWriters = schema.map(_.dataType).map(makeWriter)
-    // If the table has a timezone property, apply the correct conversions.  
See SPARK-12297.
-    val sessionTzString = configuration.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)
-    sessionTz = if (sessionTzString == null || sessionTzString == "") {
-      TimeZone.getDefault()
-    } else {
-      TimeZone.getTimeZone(sessionTzString)
-    }
-    val storageTzString = 
configuration.get(ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY)
-    storageTz = if (storageTzString == null || storageTzString == "") {
-      sessionTz
-    } else {
-      TimeZone.getTimeZone(storageTzString)
-    }
 
     val messageType = new ParquetSchemaConverter(configuration).convert(schema)
     val metadata = Map(ParquetReadSupport.SPARK_METADATA_KEY -> 
schemaString).asJava
@@ -195,13 +178,7 @@ private[parquet] class ParquetWriteSupport extends 
WriteSupport[InternalRow] wit
 
           // NOTE: Starting from Spark 1.5, Spark SQL `TimestampType` only has 
microsecond
           // precision.  Nanosecond parts of timestamp values read from INT96 
are simply stripped.
-          val rawMicros = row.getLong(ordinal)
-          val adjustedMicros = if (sessionTz.getID() == storageTz.getID()) {
-            rawMicros
-          } else {
-            DateTimeUtils.convertTz(rawMicros, storageTz, sessionTz)
-          }
-          val (julianDay, timeOfDayNanos) = 
DateTimeUtils.toJulianDay(adjustedMicros)
+          val (julianDay, timeOfDayNanos) = 
DateTimeUtils.toJulianDay(row.getLong(ordinal))
           val buf = ByteBuffer.wrap(timestampBuffer)
           
buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay)
           recordConsumer.addBinary(Binary.fromReusedByteArray(timestampBuffer))

http://git-wip-us.apache.org/repos/asf/spark/blob/ac1ab6b9/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 8fef467..ba48fac 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -39,10 +39,9 @@ import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
-import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.PartitioningUtils
-import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.hive.client.HiveClient
 import org.apache.spark.sql.internal.HiveSerDe
 import org.apache.spark.sql.internal.StaticSQLConf._
@@ -225,14 +224,6 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
       throw new TableAlreadyExistsException(db = db, table = table)
     }
 
-    val tableTz = 
tableDefinition.properties.get(ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY)
-    tableTz.foreach { tz =>
-      if (!DateTimeUtils.isValidTimezone(tz)) {
-        throw new AnalysisException(s"Cannot set" +
-          s" ${ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY} to invalid 
timezone $tz")
-      }
-    }
-
     if (tableDefinition.tableType == VIEW) {
       client.createTable(tableDefinition, ignoreIfExists)
     } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/ac1ab6b9/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index e0b565c..6b98066 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -29,7 +29,6 @@ import org.apache.spark.sql.catalyst.{QualifiedTableName, 
TableIdentifier}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.internal.SQLConf.HiveCaseSensitiveInferenceMode._
 import org.apache.spark.sql.types._
 
@@ -175,7 +174,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
             // We don't support hive bucketed tables, only ones we write out.
             bucketSpec = None,
             fileFormat = fileFormat,
-            options = options ++ getStorageTzOptions(relation))(sparkSession = 
sparkSession)
+            options = options)(sparkSession = sparkSession)
           val created = LogicalRelation(fsRelation, updatedTable)
           tableRelationCache.put(tableIdentifier, created)
           created
@@ -202,7 +201,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
                 userSpecifiedSchema = Option(dataSchema),
                 // We don't support hive bucketed tables, only ones we write 
out.
                 bucketSpec = None,
-                options = options ++ getStorageTzOptions(relation),
+                options = options,
                 className = fileType).resolveRelation(),
               table = updatedTable)
 
@@ -223,13 +222,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
     result.copy(output = newOutput)
   }
 
-  private def getStorageTzOptions(relation: CatalogRelation): Map[String, 
String] = {
-    // We add the table timezone to the relation options, which automatically 
gets injected into the
-    // hadoopConf for the Parquet Converters
-    val storageTzKey = ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY
-    relation.tableMeta.properties.get(storageTzKey).map(storageTzKey -> 
_).toMap
-  }
-
   private def inferIfNeeded(
       relation: CatalogRelation,
       options: Map[String, String],

http://git-wip-us.apache.org/repos/asf/spark/blob/ac1ab6b9/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
index 2bfd63d..05b6059 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
@@ -17,22 +17,12 @@
 
 package org.apache.spark.sql.hive
 
-import java.io.File
-import java.net.URLDecoder
 import java.sql.Timestamp
-import java.util.TimeZone
 
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.parquet.hadoop.ParquetFileReader
-import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
-
-import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession}
-import org.apache.spark.sql.catalyst.TableIdentifier
-import 
org.apache.spark.sql.execution.datasources.parquet.{ParquetCompatibilityTest, 
ParquetFileFormat}
-import org.apache.spark.sql.functions._
+import org.apache.spark.sql.Row
+import 
org.apache.spark.sql.execution.datasources.parquet.ParquetCompatibilityTest
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{StringType, StructType, TimestampType}
 
 class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with 
TestHiveSingleton {
   /**
@@ -151,369 +141,4 @@ class ParquetHiveCompatibilitySuite extends 
ParquetCompatibilityTest with TestHi
       Row(Seq(Row(1))),
       "ARRAY<STRUCT<array_element: INT>>")
   }
-
-  val testTimezones = Seq(
-    "UTC" -> "UTC",
-    "LA" -> "America/Los_Angeles",
-    "Berlin" -> "Europe/Berlin"
-  )
-  // Check creating parquet tables with timestamps, writing data into them, 
and reading it back out
-  // under a variety of conditions:
-  // * tables with explicit tz and those without
-  // * altering table properties directly
-  // * variety of timezones, local & non-local
-  val sessionTimezones = testTimezones.map(_._2).map(Some(_)) ++ Seq(None)
-  sessionTimezones.foreach { sessionTzOpt =>
-    val sparkSession = spark.newSession()
-    sessionTzOpt.foreach { tz => 
sparkSession.conf.set(SQLConf.SESSION_LOCAL_TIMEZONE.key, tz) }
-    testCreateWriteRead(sparkSession, "no_tz", None, sessionTzOpt)
-    val localTz = TimeZone.getDefault.getID()
-    testCreateWriteRead(sparkSession, "local", Some(localTz), sessionTzOpt)
-    // check with a variety of timezones.  The unit tests currently are 
configured to always use
-    // America/Los_Angeles, but even if they didn't, we'd be sure to cover a 
non-local timezone.
-    testTimezones.foreach { case (tableName, zone) =>
-      if (zone != localTz) {
-        testCreateWriteRead(sparkSession, tableName, Some(zone), sessionTzOpt)
-      }
-    }
-  }
-
-  private def testCreateWriteRead(
-      sparkSession: SparkSession,
-      baseTable: String,
-      explicitTz: Option[String],
-      sessionTzOpt: Option[String]): Unit = {
-    testCreateAlterTablesWithTimezone(sparkSession, baseTable, explicitTz, 
sessionTzOpt)
-    testWriteTablesWithTimezone(sparkSession, baseTable, explicitTz, 
sessionTzOpt)
-    testReadTablesWithTimezone(sparkSession, baseTable, explicitTz, 
sessionTzOpt)
-  }
-
-  private def checkHasTz(spark: SparkSession, table: String, tz: 
Option[String]): Unit = {
-    val tableMetadata = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier(table))
-    
assert(tableMetadata.properties.get(ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY)
 === tz)
-  }
-
-  private def testCreateAlterTablesWithTimezone(
-      spark: SparkSession,
-      baseTable: String,
-      explicitTz: Option[String],
-      sessionTzOpt: Option[String]): Unit = {
-    test(s"SPARK-12297: Create and Alter Parquet tables and timezones; 
explicitTz = $explicitTz; " +
-      s"sessionTzOpt = $sessionTzOpt") {
-      val key = ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY
-      withTable(baseTable, s"like_$baseTable", s"select_$baseTable", 
s"partitioned_$baseTable") {
-        // If we ever add a property to set the table timezone by default, 
defaultTz would change
-        val defaultTz = None
-        // check that created tables have correct TBLPROPERTIES
-        val tblProperties = explicitTz.map {
-          tz => s"""TBLPROPERTIES ($key="$tz")"""
-        }.getOrElse("")
-        spark.sql(
-          s"""CREATE TABLE $baseTable (
-                |  x int
-                | )
-                | STORED AS PARQUET
-                | $tblProperties
-            """.stripMargin)
-        val expectedTableTz = explicitTz.orElse(defaultTz)
-        checkHasTz(spark, baseTable, expectedTableTz)
-        spark.sql(
-          s"""CREATE TABLE partitioned_$baseTable (
-                |  x int
-                | )
-                | PARTITIONED BY (y int)
-                | STORED AS PARQUET
-                | $tblProperties
-            """.stripMargin)
-        checkHasTz(spark, s"partitioned_$baseTable", expectedTableTz)
-        spark.sql(s"CREATE TABLE like_$baseTable LIKE $baseTable")
-        checkHasTz(spark, s"like_$baseTable", expectedTableTz)
-        spark.sql(
-          s"""CREATE TABLE select_$baseTable
-                | STORED AS PARQUET
-                | AS
-                | SELECT * from $baseTable
-            """.stripMargin)
-        checkHasTz(spark, s"select_$baseTable", defaultTz)
-
-        // check alter table, setting, unsetting, resetting the property
-        spark.sql(
-          s"""ALTER TABLE $baseTable SET TBLPROPERTIES 
($key="America/Los_Angeles")""")
-        checkHasTz(spark, baseTable, Some("America/Los_Angeles"))
-        spark.sql(s"""ALTER TABLE $baseTable SET TBLPROPERTIES ($key="UTC")""")
-        checkHasTz(spark, baseTable, Some("UTC"))
-        spark.sql(s"""ALTER TABLE $baseTable UNSET TBLPROPERTIES ($key)""")
-        checkHasTz(spark, baseTable, None)
-        explicitTz.foreach { tz =>
-          spark.sql(s"""ALTER TABLE $baseTable SET TBLPROPERTIES 
($key="$tz")""")
-          checkHasTz(spark, baseTable, expectedTableTz)
-        }
-      }
-    }
-  }
-
-  val desiredTimestampStrings = Seq(
-    "2015-12-31 22:49:59.123",
-    "2015-12-31 23:50:59.123",
-    "2016-01-01 00:39:59.123",
-    "2016-01-01 01:29:59.123"
-  )
-  // We don't want to mess with timezones inside the tests themselves, since 
we use a shared
-  // spark context, and then we might be prone to issues from lazy vals for 
timezones.  Instead,
-  // we manually adjust the timezone just to determine what the desired millis 
(since epoch, in utc)
-  // is for various "wall-clock" times in different timezones, and then we can 
compare against those
-  // in our tests.
-  val timestampTimezoneToMillis = {
-    val originalTz = TimeZone.getDefault
-    try {
-      desiredTimestampStrings.flatMap { timestampString =>
-        Seq("America/Los_Angeles", "Europe/Berlin", "UTC").map { tzId =>
-          TimeZone.setDefault(TimeZone.getTimeZone(tzId))
-          val timestamp = Timestamp.valueOf(timestampString)
-          (timestampString, tzId) -> timestamp.getTime()
-        }
-      }.toMap
-    } finally {
-      TimeZone.setDefault(originalTz)
-    }
-  }
-
-  private def createRawData(spark: SparkSession): Dataset[(String, Timestamp)] 
= {
-    import spark.implicits._
-    val df = desiredTimestampStrings.toDF("display")
-    // this will get the millis corresponding to the display time given the 
current *session*
-    // timezone.
-    df.withColumn("ts", expr("cast(display as timestamp)")).as[(String, 
Timestamp)]
-  }
-
-  private def testWriteTablesWithTimezone(
-      spark: SparkSession,
-      baseTable: String,
-      explicitTz: Option[String],
-      sessionTzOpt: Option[String]) : Unit = {
-    val key = ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY
-    test(s"SPARK-12297: Write to Parquet tables with Timestamps; explicitTz = 
$explicitTz; " +
-        s"sessionTzOpt = $sessionTzOpt") {
-
-      withTable(s"saveAsTable_$baseTable", s"insert_$baseTable", 
s"partitioned_ts_$baseTable") {
-        val sessionTzId = sessionTzOpt.getOrElse(TimeZone.getDefault().getID())
-        // check that created tables have correct TBLPROPERTIES
-        val tblProperties = explicitTz.map {
-          tz => s"""TBLPROPERTIES ($key="$tz")"""
-        }.getOrElse("")
-
-        val rawData = createRawData(spark)
-        // Check writing data out.
-        // We write data into our tables, and then check the raw parquet files 
to see whether
-        // the correct conversion was applied.
-        rawData.write.saveAsTable(s"saveAsTable_$baseTable")
-        checkHasTz(spark, s"saveAsTable_$baseTable", None)
-        spark.sql(
-          s"""CREATE TABLE insert_$baseTable (
-                |  display string,
-                |  ts timestamp
-                | )
-                | STORED AS PARQUET
-                | $tblProperties
-               """.stripMargin)
-        checkHasTz(spark, s"insert_$baseTable", explicitTz)
-        rawData.write.insertInto(s"insert_$baseTable")
-        // no matter what, roundtripping via the table should leave the data 
unchanged
-        val readFromTable = spark.table(s"insert_$baseTable").collect()
-          .map { row => (row.getAs[String](0), 
row.getAs[Timestamp](1)).toString() }.sorted
-        assert(readFromTable === rawData.collect().map(_.toString()).sorted)
-
-        // Now we load the raw parquet data on disk, and check if it was 
adjusted correctly.
-        // Note that we only store the timezone in the table property, so when 
we read the
-        // data this way, we're bypassing all of the conversion logic, and 
reading the raw
-        // values in the parquet file.
-        val onDiskLocation = spark.sessionState.catalog
-          
.getTableMetadata(TableIdentifier(s"insert_$baseTable")).location.getPath
-        // we test reading the data back with and without the vectorized 
reader, to make sure we
-        // haven't broken reading parquet from non-hive tables, with both 
readers.
-        Seq(false, true).foreach { vectorized =>
-          spark.conf.set(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, 
vectorized)
-          val readFromDisk = spark.read.parquet(onDiskLocation).collect()
-          val storageTzId = explicitTz.getOrElse(sessionTzId)
-          readFromDisk.foreach { row =>
-            val displayTime = row.getAs[String](0)
-            val millis = row.getAs[Timestamp](1).getTime()
-            val expectedMillis = timestampTimezoneToMillis((displayTime, 
storageTzId))
-            assert(expectedMillis === millis, s"Display time '$displayTime' 
was stored " +
-              s"incorrectly with sessionTz = ${sessionTzOpt}; Got $millis, 
expected " +
-              s"$expectedMillis (delta = ${millis - expectedMillis})")
-          }
-        }
-
-        // check tables partitioned by timestamps.  We don't compare the "raw" 
data in this case,
-        // since they are adjusted even when we bypass the hive table.
-        
rawData.write.partitionBy("ts").saveAsTable(s"partitioned_ts_$baseTable")
-        val partitionDiskLocation = spark.sessionState.catalog
-          
.getTableMetadata(TableIdentifier(s"partitioned_ts_$baseTable")).location.getPath
-        // no matter what mix of timezones we use, the dirs should specify the 
value with the
-        // same time we use for display.
-        val parts = new File(partitionDiskLocation).list().collect {
-          case name if name.startsWith("ts=") => 
URLDecoder.decode(name.stripPrefix("ts="))
-        }.toSet
-        assert(parts === desiredTimestampStrings.toSet)
-      }
-    }
-  }
-
-  private def testReadTablesWithTimezone(
-      spark: SparkSession,
-      baseTable: String,
-      explicitTz: Option[String],
-      sessionTzOpt: Option[String]): Unit = {
-    val key = ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY
-    test(s"SPARK-12297: Read from Parquet tables with Timestamps; explicitTz = 
$explicitTz; " +
-      s"sessionTzOpt = $sessionTzOpt") {
-      withTable(s"external_$baseTable", s"partitioned_$baseTable") {
-        // we intentionally save this data directly, without creating a table, 
so we can
-        // see that the data is read back differently depending on table 
properties.
-        // we'll save with adjusted millis, so that it should be the correct 
millis after reading
-        // back.
-        val rawData = createRawData(spark)
-        // to avoid closing over entire class
-        val timestampTimezoneToMillis = this.timestampTimezoneToMillis
-        import spark.implicits._
-        val adjustedRawData = (explicitTz match {
-          case Some(tzId) =>
-            rawData.map { case (displayTime, _) =>
-              val storageMillis = timestampTimezoneToMillis((displayTime, 
tzId))
-              (displayTime, new Timestamp(storageMillis))
-            }
-          case _ =>
-            rawData
-        }).withColumnRenamed("_1", "display").withColumnRenamed("_2", "ts")
-        withTempPath { basePath =>
-          val unpartitionedPath = new File(basePath, "flat")
-          val partitionedPath = new File(basePath, "partitioned")
-          adjustedRawData.write.parquet(unpartitionedPath.getCanonicalPath)
-          val options = Map("path" -> unpartitionedPath.getCanonicalPath) ++
-            explicitTz.map { tz => Map(key -> tz) }.getOrElse(Map())
-
-          spark.catalog.createTable(
-            tableName = s"external_$baseTable",
-            source = "parquet",
-            schema = new StructType().add("display", StringType).add("ts", 
TimestampType),
-            options = options
-          )
-
-          // also write out a partitioned table, to make sure we can access 
that correctly.
-          // add a column we can partition by (value doesn't particularly 
matter).
-          val partitionedData = adjustedRawData.withColumn("id", 
monotonicallyIncreasingId)
-          partitionedData.write.partitionBy("id")
-            .parquet(partitionedPath.getCanonicalPath)
-          // unfortunately, catalog.createTable() doesn't let us specify 
partitioning, so just use
-          // a "CREATE TABLE" stmt.
-          val tblOpts = explicitTz.map { tz => s"""TBLPROPERTIES 
($key="$tz")""" }.getOrElse("")
-          spark.sql(s"""CREATE EXTERNAL TABLE partitioned_$baseTable (
-                         |  display string,
-                         |  ts timestamp
-                         |)
-                         |PARTITIONED BY (id bigint)
-                         |STORED AS parquet
-                         |LOCATION 'file:${partitionedPath.getCanonicalPath}'
-                         |$tblOpts
-                          """.stripMargin)
-          spark.sql(s"msck repair table partitioned_$baseTable")
-
-          for {
-            vectorized <- Seq(false, true)
-            partitioned <- Seq(false, true)
-          } {
-            withClue(s"vectorized = $vectorized; partitioned = $partitioned") {
-              spark.conf.set(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, 
vectorized)
-              val sessionTz = 
sessionTzOpt.getOrElse(TimeZone.getDefault().getID())
-              val table = if (partitioned) s"partitioned_$baseTable" else 
s"external_$baseTable"
-              val query = s"select display, cast(ts as string) as 
ts_as_string, ts " +
-                s"from $table"
-              val collectedFromExternal = spark.sql(query).collect()
-              assert( collectedFromExternal.size === 4)
-              collectedFromExternal.foreach { row =>
-                val displayTime = row.getAs[String](0)
-                // the timestamp should still display the same, despite the 
changes in timezones
-                assert(displayTime === row.getAs[String](1).toString())
-                // we'll also check that the millis behind the timestamp has 
the appropriate
-                // adjustments.
-                val millis = row.getAs[Timestamp](2).getTime()
-                val expectedMillis = timestampTimezoneToMillis((displayTime, 
sessionTz))
-                val delta = millis - expectedMillis
-                val deltaHours = delta / (1000L * 60 * 60)
-                assert(millis === expectedMillis, s"Display time 
'$displayTime' did not have " +
-                  s"correct millis: was $millis, expected $expectedMillis; 
delta = $delta " +
-                  s"($deltaHours hours)")
-              }
-
-              // Now test that the behavior is still correct even with a 
filter which could get
-              // pushed down into parquet.  We don't need extra handling for 
pushed down
-              // predicates because (a) in ParquetFilters, we ignore 
TimestampType and (b) parquet
-              // does not read statistics from int96 fields, as they are 
unsigned.  See
-              // scalastyle:off line.size.limit
-              // 
https://github.com/apache/parquet-mr/blob/2fd62ee4d524c270764e9b91dca72e5cf1a005b7/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java#L419
-              // 
https://github.com/apache/parquet-mr/blob/2fd62ee4d524c270764e9b91dca72e5cf1a005b7/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java#L348
-              // scalastyle:on line.size.limit
-              //
-              // Just to be defensive in case anything ever changes in 
parquet, this test checks
-              // the assumption on column stats, and also the end-to-end 
behavior.
-
-              val hadoopConf = sparkContext.hadoopConfiguration
-              val fs = FileSystem.get(hadoopConf)
-              val parts = if (partitioned) {
-                val subdirs = fs.listStatus(new 
Path(partitionedPath.getCanonicalPath))
-                  .filter(_.getPath().getName().startsWith("id="))
-                fs.listStatus(subdirs.head.getPath())
-                  .filter(_.getPath().getName().endsWith(".parquet"))
-              } else {
-                fs.listStatus(new Path(unpartitionedPath.getCanonicalPath))
-                  .filter(_.getPath().getName().endsWith(".parquet"))
-              }
-              // grab the meta data from the parquet file.  The next section 
of asserts just make
-              // sure the test is configured correctly.
-              assert(parts.size == 1)
-              val oneFooter = ParquetFileReader.readFooter(hadoopConf, 
parts.head.getPath)
-              assert(oneFooter.getFileMetaData.getSchema.getColumns.size === 2)
-              
assert(oneFooter.getFileMetaData.getSchema.getColumns.get(1).getType() ===
-                PrimitiveTypeName.INT96)
-              val oneBlockMeta = oneFooter.getBlocks().get(0)
-              val oneBlockColumnMeta = oneBlockMeta.getColumns().get(1)
-              val columnStats = oneBlockColumnMeta.getStatistics
-              // This is the important assert.  Column stats are written, but 
they are ignored
-              // when the data is read back as mentioned above, b/c int96 is 
unsigned.  This
-              // assert makes sure this holds even if we change parquet 
versions (if eg. there
-              // were ever statistics even on unsigned columns).
-              assert(columnStats.isEmpty)
-
-              // These queries should return the entire dataset, but if the 
predicates were
-              // applied to the raw values in parquet, they would incorrectly 
filter data out.
-              Seq(
-                ">" -> "2015-12-31 22:00:00",
-                "<" -> "2016-01-01 02:00:00"
-              ).foreach { case (comparison, value) =>
-                val query =
-                  s"select ts from $table where ts $comparison '$value'"
-                val countWithFilter = spark.sql(query).count()
-                assert(countWithFilter === 4, query)
-              }
-            }
-          }
-        }
-      }
-    }
-  }
-
-  test("SPARK-12297: exception on bad timezone") {
-    val key = ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY
-    val badTzException = intercept[AnalysisException] {
-      spark.sql(
-        s"""CREATE TABLE bad_tz_table (
-              |  x int
-              | )
-              | STORED AS PARQUET
-              | TBLPROPERTIES ($key="Blart Versenwald III")
-            """.stripMargin)
-    }
-    assert(badTzException.getMessage.contains("Blart Versenwald III"))
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to