Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19769#discussion_r151634730 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala --- @@ -87,4 +95,107 @@ class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedS Row(Seq(2, 3)))) } } + + test("parquet timestamp conversion") { + // Make a table with one parquet file written by impala, and one parquet file written by spark. + // We should only adjust the timestamps in the impala file, and only if the conf is set + val impalaFile = "test-data/impala_timestamp.parq" + + // here are the timestamps in the impala file, as they were saved by impala + val impalaFileData = + Seq( + "2001-01-01 01:01:01", + "2002-02-02 02:02:02", + "2003-03-03 03:03:03" + ).map { s => java.sql.Timestamp.valueOf(s) } + val impalaPath = Thread.currentThread().getContextClassLoader.getResource(impalaFile) + .toURI.getPath + withTempPath { tableDir => + val ts = Seq( + "2004-04-04 04:04:04", + "2005-05-05 05:05:05", + "2006-06-06 06:06:06" + ).map { s => java.sql.Timestamp.valueOf(s) } + import testImplicits._ + // match the column names of the file from impala + val df = spark.createDataset(ts).toDF().repartition(1).withColumnRenamed("value", "ts") + df.write.parquet(tableDir.getAbsolutePath) + FileUtils.copyFile(new File(impalaPath), new File(tableDir, "part-00001.parq")) + + Seq(false, true).foreach { int96TimestampConversion => + Seq(false, true).foreach { vectorized => + withSQLConf( + (SQLConf.PARQUET_INT96_TIMESTAMP_CONVERSION.key, int96TimestampConversion.toString()), + (SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, vectorized.toString()) + ) { + val readBack = spark.read.parquet(tableDir.getAbsolutePath).collect() + assert(readBack.size === 6) + // if we apply the conversion, we'll get the "right" values, as saved by impala in the + // original file. Otherwise, they're off by the local timezone offset, set to + // America/Los_Angeles in tests + val impalaExpectations = if (int96TimestampConversion) { + impalaFileData + } else { + impalaFileData.map { ts => + DateTimeUtils.toJavaTimestamp(DateTimeUtils.convertTz( + DateTimeUtils.fromJavaTimestamp(ts), + DateTimeUtils.TimeZoneUTC, + DateTimeUtils.getTimeZone(conf.sessionLocalTimeZone))) + } + } + val fullExpectations = (ts ++ impalaExpectations).map(_.toString).sorted.toArray + val actual = readBack.map(_.getTimestamp(0).toString).sorted + withClue(s"applyConversion = $int96TimestampConversion; vectorized = $vectorized") { + assert(fullExpectations === actual) + + // Now test that the behavior is still correct even with a filter which could get + // pushed down into parquet. We don't need extra handling for pushed down + // predicates because (a) in ParquetFilters, we ignore TimestampType and (b) parquet + // does not read statistics from int96 fields, as they are unsigned. See + // scalastyle:off line.size.limit + // https://github.com/apache/parquet-mr/blob/2fd62ee4d524c270764e9b91dca72e5cf1a005b7/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java#L419 + // https://github.com/apache/parquet-mr/blob/2fd62ee4d524c270764e9b91dca72e5cf1a005b7/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java#L348 + // scalastyle:on line.size.limit + // + // Just to be defensive in case anything ever changes in parquet, this test checks + // the assumption on column stats, and also the end-to-end behavior. + + val hadoopConf = sparkContext.hadoopConfiguration + val fs = FileSystem.get(hadoopConf) + val parts = fs.listStatus(new Path(tableDir.getAbsolutePath), new PathFilter { + override def accept(path: Path): Boolean = !path.getName.startsWith("_") + }) + // grab the meta data from the parquet file. The next section of asserts just make + // sure the test is configured correctly. + assert(parts.size == 2) + parts.map { part => --- End diff -- `map` -> `foreach`
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org