Github user henryr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19769#discussion_r151830301
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
 ---
    @@ -87,4 +95,109 @@ class ParquetInteroperabilitySuite extends 
ParquetCompatibilityTest with SharedS
               Row(Seq(2, 3))))
         }
       }
    +
    +  test("parquet timestamp conversion") {
    +    // Make a table with one parquet file written by impala, and one 
parquet file written by spark.
    +    // We should only adjust the timestamps in the impala file, and only 
if the conf is set
    +    val impalaFile = "test-data/impala_timestamp.parq"
    +
    +    // here are the timestamps in the impala file, as they were saved by 
impala
    +    val impalaFileData =
    +      Seq(
    +        "2001-01-01 01:01:01",
    +        "2002-02-02 02:02:02",
    +        "2003-03-03 03:03:03"
    +      ).map(java.sql.Timestamp.valueOf)
    +    val impalaPath = 
Thread.currentThread().getContextClassLoader.getResource(impalaFile)
    +      .toURI.getPath
    +    withTempPath { tableDir =>
    +      val ts = Seq(
    +        "2004-04-04 04:04:04",
    +        "2005-05-05 05:05:05",
    +        "2006-06-06 06:06:06"
    +      ).map { s => java.sql.Timestamp.valueOf(s) }
    +      import testImplicits._
    +      // match the column names of the file from impala
    +      val df = 
spark.createDataset(ts).toDF().repartition(1).withColumnRenamed("value", "ts")
    +      df.write.parquet(tableDir.getAbsolutePath)
    +      FileUtils.copyFile(new File(impalaPath), new File(tableDir, 
"part-00001.parq"))
    +
    +      Seq(false, true).foreach { int96TimestampConversion =>
    +        Seq(false, true).foreach { vectorized =>
    +          withSQLConf(
    +              (SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key,
    +                SQLConf.ParquetOutputTimestampType.INT96.toString),
    +              (SQLConf.PARQUET_INT96_TIMESTAMP_CONVERSION.key, 
int96TimestampConversion.toString()),
    +              (SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, 
vectorized.toString())
    +          ) {
    +            val readBack = 
spark.read.parquet(tableDir.getAbsolutePath).collect()
    +            assert(readBack.size === 6)
    +            // if we apply the conversion, we'll get the "right" values, 
as saved by impala in the
    +            // original file.  Otherwise, they're off by the local 
timezone offset, set to
    +            // America/Los_Angeles in tests
    +            val impalaExpectations = if (int96TimestampConversion) {
    +              impalaFileData
    +            } else {
    +              impalaFileData.map { ts =>
    +                DateTimeUtils.toJavaTimestamp(DateTimeUtils.convertTz(
    +                  DateTimeUtils.fromJavaTimestamp(ts),
    +                  DateTimeUtils.TimeZoneUTC,
    +                  DateTimeUtils.getTimeZone(conf.sessionLocalTimeZone)))
    +              }
    +            }
    +            val fullExpectations = (ts ++ 
impalaExpectations).map(_.toString).sorted.toArray
    +            val actual = readBack.map(_.getTimestamp(0).toString).sorted
    +            withClue(s"applyConversion = $int96TimestampConversion; 
vectorized = $vectorized") {
    +              assert(fullExpectations === actual)
    +
    +              // Now test that the behavior is still correct even with a 
filter which could get
    +              // pushed down into parquet.  We don't need extra handling 
for pushed down
    +              // predicates because (a) in ParquetFilters, we ignore 
TimestampType and (b) parquet
    --- End diff --
    
    how about adding an assertion or comment to ParquetFilters that it would be 
unsafe to add timestamp support?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to