Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19769#discussion_r151635947 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala --- @@ -87,4 +96,113 @@ class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedS Row(Seq(2, 3)))) } } + + val ImpalaFile = "test-data/impala_timestamp.parq" + test("parquet timestamp conversion") { + // Make a table with one parquet file written by impala, and one parquet file written by spark. + // We should only adjust the timestamps in the impala file, and only if the conf is set + + // here's the timestamps in the impala file, as they were saved by impala + val impalaFileData = + Seq( + "2001-01-01 01:01:01", + "2002-02-02 02:02:02", + "2003-03-03 03:03:03" + ).map { s => java.sql.Timestamp.valueOf(s) } + val impalaFile = Thread.currentThread().getContextClassLoader.getResource(ImpalaFile) + .toURI.getPath + withTempPath { tableDir => + val ts = Seq( + "2004-04-04 04:04:04", + "2005-05-05 05:05:05", + "2006-06-06 06:06:06" + ).map { s => java.sql.Timestamp.valueOf(s) } + val s = spark + import s.implicits._ + // match the column names of the file from impala + val df = spark.createDataset(ts).toDF().repartition(1).withColumnRenamed("value", "ts") + val schema = df.schema + df.write.parquet(tableDir.getAbsolutePath) + FileUtils.copyFile(new File(impalaFile), new File(tableDir, "part-00001.parq")) + + Seq(false, true).foreach { applyConversion => + Seq(false, true).foreach { vectorized => + withSQLConf( + (SQLConf.PARQUET_INT96_TIMESTAMP_CONVERSION.key, applyConversion.toString()), + (SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, vectorized.toString()) + ) { + val read = spark.read.parquet(tableDir.getAbsolutePath).collect() + assert(read.size === 6) + // if we apply the conversion, we'll get the "right" values, as saved by impala in the + // original file. Otherwise, they're off by the local timezone offset, set to + // America/Los_Angeles in tests + val impalaExpectations = if (applyConversion) { + impalaFileData + } else { + impalaFileData.map { ts => + DateTimeUtils.toJavaTimestamp(DateTimeUtils.convertTz( + DateTimeUtils.fromJavaTimestamp(ts), + TimeZone.getTimeZone("UTC"), + TimeZone.getDefault())) + } + } + val fullExpectations = (ts ++ impalaExpectations).map { + _.toString() + }.sorted.toArray + val actual = read.map { + _.getTimestamp(0).toString() + }.sorted + withClue(s"applyConversion = $applyConversion; vectorized = $vectorized") { + assert(fullExpectations === actual) + + // Now test that the behavior is still correct even with a filter which could get + // pushed down into parquet. We don't need extra handling for pushed down + // predicates because (a) in ParquetFilters, we ignore TimestampType and (b) parquet + // does not read statistics from int96 fields, as they are unsigned. See + // scalastyle:off line.size.limit + // https://github.com/apache/parquet-mr/blob/2fd62ee4d524c270764e9b91dca72e5cf1a005b7/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java#L419 + // https://github.com/apache/parquet-mr/blob/2fd62ee4d524c270764e9b91dca72e5cf1a005b7/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java#L348 + // scalastyle:on line.size.limit + // + // Just to be defensive in case anything ever changes in parquet, this test checks + // the assumption on column stats, and also the end-to-end behavior. + + val hadoopConf = sparkContext.hadoopConfiguration + val fs = FileSystem.get(hadoopConf) + val parts = fs.listStatus(new Path(tableDir.getAbsolutePath), new PathFilter { + override def accept(path: Path): Boolean = !path.getName.startsWith("_") + }) + // grab the meta data from the parquet file. The next section of asserts just make + // sure the test is configured correctly. + assert(parts.size == 2) + parts.map { part => + val oneFooter = --- End diff -- I just found it shorter. I am fine.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org