This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 777b797 [SPARK-27522][SQL][TEST] Test migration from INT96 to TIMESTAMP_MICROS for timestamps in parquet 777b797 is described below commit 777b797867045c9717813e9dab2ab9012a1889fc Author: Maxim Gekk <max.g...@gmail.com> AuthorDate: Mon Apr 22 16:34:13 2019 +0900 [SPARK-27522][SQL][TEST] Test migration from INT96 to TIMESTAMP_MICROS for timestamps in parquet ## What changes were proposed in this pull request? Added tests to check migration from `INT96` to `TIMESTAMP_MICROS` (`INT64`) for timestamps in parquet files. In particular: - Append `TIMESTAMP_MICROS` timestamps to **existing parquet** files with `INT96` timestamps - Append `TIMESTAMP_MICROS` timestamps to a table with `INT96` timestamps - Append `INT96` to `TIMESTAMP_MICROS` timestamps in **parquet files** - Append `INT96` to `TIMESTAMP_MICROS` timestamps in a **table** Closes #24417 from MaxGekk/parquet-timestamp-int64-tests. Authored-by: Maxim Gekk <max.g...@gmail.com> Signed-off-by: HyukjinKwon <gurwls...@apache.org> --- .../datasources/parquet/ParquetQuerySuite.scala | 43 ++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 8cc3bee..4959275 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.datasources.parquet import java.io.File +import java.util.concurrent.TimeUnit import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.parquet.hadoop.ParquetOutputFormat @@ -881,6 +882,48 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext } } + test("Migration from INT96 to TIMESTAMP_MICROS timestamp type") { + def testMigration(fromTsType: String, toTsType: String): Unit = { + def checkAppend(write: DataFrameWriter[_] => Unit, readback: => DataFrame): Unit = { + def data(start: Int, end: Int): Seq[Row] = (start to end).map { i => + val ts = new java.sql.Timestamp(TimeUnit.SECONDS.toMillis(i)) + ts.setNanos(123456000) + Row(ts) + } + val schema = new StructType().add("time", TimestampType) + val df1 = spark.createDataFrame(sparkContext.parallelize(data(0, 1)), schema) + withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> fromTsType) { + write(df1.write) + } + val df2 = spark.createDataFrame(sparkContext.parallelize(data(2, 10)), schema) + withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> toTsType) { + write(df2.write.mode(SaveMode.Append)) + } + Seq("true", "false").foreach { vectorized => + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) { + checkAnswer(readback, df1.unionAll(df2)) + } + } + } + + Seq(false, true).foreach { mergeSchema => + withTempPath { file => + checkAppend(_.parquet(file.getCanonicalPath), + spark.read.option("mergeSchema", mergeSchema).parquet(file.getCanonicalPath)) + } + + withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> mergeSchema.toString) { + val tableName = "parquet_timestamp_migration" + withTable(tableName) { + checkAppend(_.saveAsTable(tableName), spark.table(tableName)) + } + } + } + } + + testMigration(fromTsType = "INT96", toTsType = "TIMESTAMP_MICROS") + testMigration(fromTsType = "TIMESTAMP_MICROS", toTsType = "INT96") + } } object TestingUDT { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org