Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19769#discussion_r152055985 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala --- @@ -87,4 +95,109 @@ class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedS Row(Seq(2, 3)))) } } + + test("parquet timestamp conversion") { + // Make a table with one parquet file written by impala, and one parquet file written by spark. + // We should only adjust the timestamps in the impala file, and only if the conf is set + val impalaFile = "test-data/impala_timestamp.parq" + + // here are the timestamps in the impala file, as they were saved by impala + val impalaFileData = + Seq( + "2001-01-01 01:01:01", + "2002-02-02 02:02:02", + "2003-03-03 03:03:03" + ).map(java.sql.Timestamp.valueOf) + val impalaPath = Thread.currentThread().getContextClassLoader.getResource(impalaFile) + .toURI.getPath + withTempPath { tableDir => + val ts = Seq( + "2004-04-04 04:04:04", + "2005-05-05 05:05:05", + "2006-06-06 06:06:06" + ).map { s => java.sql.Timestamp.valueOf(s) } + import testImplicits._ + // match the column names of the file from impala + val df = spark.createDataset(ts).toDF().repartition(1).withColumnRenamed("value", "ts") + df.write.parquet(tableDir.getAbsolutePath) + FileUtils.copyFile(new File(impalaPath), new File(tableDir, "part-00001.parq")) + + Seq(false, true).foreach { int96TimestampConversion => + Seq(false, true).foreach { vectorized => + withSQLConf( + (SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key, + SQLConf.ParquetOutputTimestampType.INT96.toString), + (SQLConf.PARQUET_INT96_TIMESTAMP_CONVERSION.key, int96TimestampConversion.toString()), + (SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, vectorized.toString()) + ) { + val readBack = spark.read.parquet(tableDir.getAbsolutePath).collect() + assert(readBack.size === 6) + // if we apply the conversion, we'll get the "right" values, as saved by impala in the + // original file. Otherwise, they're off by the local timezone offset, set to + // America/Los_Angeles in tests + val impalaExpectations = if (int96TimestampConversion) { + impalaFileData + } else { + impalaFileData.map { ts => + DateTimeUtils.toJavaTimestamp(DateTimeUtils.convertTz( + DateTimeUtils.fromJavaTimestamp(ts), + DateTimeUtils.TimeZoneUTC, + DateTimeUtils.getTimeZone(conf.sessionLocalTimeZone))) + } + } + val fullExpectations = (ts ++ impalaExpectations).map(_.toString).sorted.toArray + val actual = readBack.map(_.getTimestamp(0).toString).sorted + withClue(s"applyConversion = $int96TimestampConversion; vectorized = $vectorized") { + assert(fullExpectations === actual) + + // Now test that the behavior is still correct even with a filter which could get + // pushed down into parquet. We don't need extra handling for pushed down + // predicates because (a) in ParquetFilters, we ignore TimestampType and (b) parquet --- End diff -- I'd prefer not to, but don't feel very strongly. I don't think it makes sense to add an assert in `ParquetFilters`, just because of the way the code is structured -- its based on pattern matching in a partial function. And I think a comment there is just likely to get out of date / get ignored as there isn't a great place for that. But like this, the test will fail and there will be an explanation right here explaining why.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org