This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 777b797  [SPARK-27522][SQL][TEST] Test migration from INT96 to 
TIMESTAMP_MICROS for timestamps in parquet
777b797 is described below

commit 777b797867045c9717813e9dab2ab9012a1889fc
Author: Maxim Gekk <max.g...@gmail.com>
AuthorDate: Mon Apr 22 16:34:13 2019 +0900

    [SPARK-27522][SQL][TEST] Test migration from INT96 to TIMESTAMP_MICROS for 
timestamps in parquet
    
    ## What changes were proposed in this pull request?
    
    Added tests to check migration from `INT96` to `TIMESTAMP_MICROS` (`INT64`) 
for timestamps in parquet files. In particular:
    - Append `TIMESTAMP_MICROS` timestamps to **existing parquet** files with 
`INT96` timestamps
    - Append `TIMESTAMP_MICROS` timestamps to a table with `INT96` timestamps
    - Append `INT96` to `TIMESTAMP_MICROS` timestamps in **parquet files**
    - Append `INT96` to `TIMESTAMP_MICROS` timestamps in a **table**
    
    Closes #24417 from MaxGekk/parquet-timestamp-int64-tests.
    
    Authored-by: Maxim Gekk <max.g...@gmail.com>
    Signed-off-by: HyukjinKwon <gurwls...@apache.org>
---
 .../datasources/parquet/ParquetQuerySuite.scala    | 43 ++++++++++++++++++++++
 1 file changed, 43 insertions(+)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 8cc3bee..4959275 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution.datasources.parquet
 
 import java.io.File
+import java.util.concurrent.TimeUnit
 
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.parquet.hadoop.ParquetOutputFormat
@@ -881,6 +882,48 @@ class ParquetQuerySuite extends QueryTest with ParquetTest 
with SharedSQLContext
     }
   }
 
+  test("Migration from INT96 to TIMESTAMP_MICROS timestamp type") {
+    def testMigration(fromTsType: String, toTsType: String): Unit = {
+      def checkAppend(write: DataFrameWriter[_] => Unit, readback: => 
DataFrame): Unit = {
+        def data(start: Int, end: Int): Seq[Row] = (start to end).map { i =>
+          val ts = new java.sql.Timestamp(TimeUnit.SECONDS.toMillis(i))
+          ts.setNanos(123456000)
+          Row(ts)
+        }
+        val schema = new StructType().add("time", TimestampType)
+        val df1 = spark.createDataFrame(sparkContext.parallelize(data(0, 1)), 
schema)
+        withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> fromTsType) {
+          write(df1.write)
+        }
+        val df2 = spark.createDataFrame(sparkContext.parallelize(data(2, 10)), 
schema)
+        withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> toTsType) {
+          write(df2.write.mode(SaveMode.Append))
+        }
+        Seq("true", "false").foreach { vectorized =>
+          withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> 
vectorized) {
+            checkAnswer(readback, df1.unionAll(df2))
+          }
+        }
+      }
+
+      Seq(false, true).foreach { mergeSchema =>
+        withTempPath { file =>
+          checkAppend(_.parquet(file.getCanonicalPath),
+            spark.read.option("mergeSchema", 
mergeSchema).parquet(file.getCanonicalPath))
+        }
+
+        withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> 
mergeSchema.toString) {
+          val tableName = "parquet_timestamp_migration"
+          withTable(tableName) {
+            checkAppend(_.saveAsTable(tableName), spark.table(tableName))
+          }
+        }
+      }
+    }
+
+    testMigration(fromTsType = "INT96", toTsType = "TIMESTAMP_MICROS")
+    testMigration(fromTsType = "TIMESTAMP_MICROS", toTsType = "INT96")
+  }
 }
 
 object TestingUDT {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to