Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19769#discussion_r151634730
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
 ---
    @@ -87,4 +95,107 @@ class ParquetInteroperabilitySuite extends 
ParquetCompatibilityTest with SharedS
               Row(Seq(2, 3))))
         }
       }
    +
    +  test("parquet timestamp conversion") {
    +    // Make a table with one parquet file written by impala, and one 
parquet file written by spark.
    +    // We should only adjust the timestamps in the impala file, and only 
if the conf is set
    +    val impalaFile = "test-data/impala_timestamp.parq"
    +
    +    // here are the timestamps in the impala file, as they were saved by 
impala
    +    val impalaFileData =
    +      Seq(
    +        "2001-01-01 01:01:01",
    +        "2002-02-02 02:02:02",
    +        "2003-03-03 03:03:03"
    +      ).map { s => java.sql.Timestamp.valueOf(s) }
    +    val impalaPath = 
Thread.currentThread().getContextClassLoader.getResource(impalaFile)
    +      .toURI.getPath
    +    withTempPath { tableDir =>
    +      val ts = Seq(
    +        "2004-04-04 04:04:04",
    +        "2005-05-05 05:05:05",
    +        "2006-06-06 06:06:06"
    +      ).map { s => java.sql.Timestamp.valueOf(s) }
    +      import testImplicits._
    +      // match the column names of the file from impala
    +      val df = 
spark.createDataset(ts).toDF().repartition(1).withColumnRenamed("value", "ts")
    +      df.write.parquet(tableDir.getAbsolutePath)
    +      FileUtils.copyFile(new File(impalaPath), new File(tableDir, 
"part-00001.parq"))
    +
    +      Seq(false, true).foreach { int96TimestampConversion =>
    +        Seq(false, true).foreach { vectorized =>
    +          withSQLConf(
    +              (SQLConf.PARQUET_INT96_TIMESTAMP_CONVERSION.key, 
int96TimestampConversion.toString()),
    +              (SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, 
vectorized.toString())
    +          ) {
    +            val readBack = 
spark.read.parquet(tableDir.getAbsolutePath).collect()
    +            assert(readBack.size === 6)
    +            // if we apply the conversion, we'll get the "right" values, 
as saved by impala in the
    +            // original file.  Otherwise, they're off by the local 
timezone offset, set to
    +            // America/Los_Angeles in tests
    +            val impalaExpectations = if (int96TimestampConversion) {
    +              impalaFileData
    +            } else {
    +              impalaFileData.map { ts =>
    +                DateTimeUtils.toJavaTimestamp(DateTimeUtils.convertTz(
    +                  DateTimeUtils.fromJavaTimestamp(ts),
    +                  DateTimeUtils.TimeZoneUTC,
    +                  DateTimeUtils.getTimeZone(conf.sessionLocalTimeZone)))
    +              }
    +            }
    +            val fullExpectations = (ts ++ 
impalaExpectations).map(_.toString).sorted.toArray
    +            val actual = readBack.map(_.getTimestamp(0).toString).sorted
    +            withClue(s"applyConversion = $int96TimestampConversion; 
vectorized = $vectorized") {
    +              assert(fullExpectations === actual)
    +
    +              // Now test that the behavior is still correct even with a 
filter which could get
    +              // pushed down into parquet.  We don't need extra handling 
for pushed down
    +              // predicates because (a) in ParquetFilters, we ignore 
TimestampType and (b) parquet
    +              // does not read statistics from int96 fields, as they are 
unsigned.  See
    +              // scalastyle:off line.size.limit
    +              // 
https://github.com/apache/parquet-mr/blob/2fd62ee4d524c270764e9b91dca72e5cf1a005b7/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java#L419
    +              // 
https://github.com/apache/parquet-mr/blob/2fd62ee4d524c270764e9b91dca72e5cf1a005b7/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java#L348
    +              // scalastyle:on line.size.limit
    +              //
    +              // Just to be defensive in case anything ever changes in 
parquet, this test checks
    +              // the assumption on column stats, and also the end-to-end 
behavior.
    +
    +              val hadoopConf = sparkContext.hadoopConfiguration
    +              val fs = FileSystem.get(hadoopConf)
    +              val parts = fs.listStatus(new 
Path(tableDir.getAbsolutePath), new PathFilter {
    +                override def accept(path: Path): Boolean = 
!path.getName.startsWith("_")
    +              })
    +              // grab the meta data from the parquet file.  The next 
section of asserts just make
    +              // sure the test is configured correctly.
    +              assert(parts.size == 2)
    +              parts.map { part =>
    --- End diff --
    
    `map` -> `foreach`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to