This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new 57849a5 [SPARK-36034][SQL] Rebase datetime in pushed down filters to parquet 57849a5 is described below commit 57849a54da39c337fc7209404815e8980e83e03b Author: Max Gekk <max.g...@gmail.com> AuthorDate: Thu Jul 15 22:21:57 2021 +0300 [SPARK-36034][SQL] Rebase datetime in pushed down filters to parquet ### What changes were proposed in this pull request? In the PR, I propose to propagate either the SQL config `spark.sql.parquet.datetimeRebaseModeInRead` or/and Parquet option `datetimeRebaseMode` to `ParquetFilters`. The `ParquetFilters` class uses the settings in conversions of dates/timestamps instances from datasource filters to values pushed via `FilterApi` to the `parquet-column` lib. Before the changes, date/timestamp values expressed as days/microseconds/milliseconds are interpreted as offsets in Proleptic Gregorian calendar, and pushed to the parquet library as is. That works fine if timestamp/dates values in parquet files were saved in the `CORRECTED` mode but in the `LEGACY` mode, filter's values could not match to actual values. After the changes, timestamp/dates values of filters pushed down to parquet libs such as `FilterApi.eq(col1, -719162)` are rebased according the rebase settings. For the example, if the rebase mode is `CORRECTED`, **-719162** is pushed down as is but if the current rebase mode is `LEGACY`, the number of days is rebased to **-719164**. For more context, the PR description https://github.com/apache/spark/pull/28067 shows the diffs between two calendars. ### Why are the changes needed? The changes fix the bug portrayed by the following example from SPARK-36034: ```scala In [27]: spark.conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "LEGACY") >>> spark.sql("SELECT DATE '0001-01-01' AS date").write.mode("overwrite").parquet("date_written_by_spark3_legacy") >>> spark.read.parquet("date_written_by_spark3_legacy").where("date = '0001-01-01'").show() +----+ |date| +----+ +----+ ``` The result must have the date value `0001-01-01`. ### Does this PR introduce _any_ user-facing change? In some sense, yes. Query results can be different in some cases. For the example above: ```scala scala> spark.conf.set("spark.sql.parquet.datetimeRebaseModeInWrite", "LEGACY") scala> spark.sql("SELECT DATE '0001-01-01' AS date").write.mode("overwrite").parquet("date_written_by_spark3_legacy") scala> spark.read.parquet("date_written_by_spark3_legacy").where("date = '0001-01-01'").show(false) +----------+ |date | +----------+ |0001-01-01| +----------+ ``` ### How was this patch tested? By running the modified test suite `ParquetFilterSuite`: ``` $ build/sbt "test:testOnly *ParquetV1FilterSuite" $ build/sbt "test:testOnly *ParquetV2FilterSuite" ``` Closes #33347 from MaxGekk/fix-parquet-ts-filter-pushdown. Authored-by: Max Gekk <max.g...@gmail.com> Signed-off-by: Max Gekk <max.g...@gmail.com> (cherry picked from commit b09b7f7cc024d3054debd7bdb51caec3b53764d7) Signed-off-by: Max Gekk <max.g...@gmail.com> --- .../datasources/parquet/ParquetFileFormat.scala | 17 ++- .../datasources/parquet/ParquetFilters.scala | 29 +++- .../v2/parquet/ParquetPartitionReaderFactory.scala | 17 ++- .../v2/parquet/ParquetScanBuilder.scala | 14 +- .../datasources/parquet/ParquetFilterSuite.scala | 164 +++++++++++---------- 5 files changed, 145 insertions(+), 96 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 48e2e6e..ee229a3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -266,11 +266,21 @@ class ParquetFileFormat lazy val footerFileMetaData = ParquetFooterReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData + val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode( + footerFileMetaData.getKeyValueMetaData.get, + datetimeRebaseModeInRead) // Try to push down filters when filter push-down is enabled. val pushed = if (enableParquetFilterPushDown) { val parquetSchema = footerFileMetaData.getSchema - val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp, - pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive) + val parquetFilters = new ParquetFilters( + parquetSchema, + pushDownDate, + pushDownTimestamp, + pushDownDecimal, + pushDownStringStartWith, + pushDownInFilterThreshold, + isCaseSensitive, + datetimeRebaseMode) filters // Collects all converted Parquet filter predicates. Notice that not all predicates can be // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` @@ -296,9 +306,6 @@ class ParquetFileFormat None } - val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode( - footerFileMetaData.getKeyValueMetaData.get, - datetimeRebaseModeInRead) val int96RebaseMode = DataSourceUtils.int96RebaseMode( footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 9999df7..5afb736 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -35,6 +35,8 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._ import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} +import org.apache.spark.sql.catalyst.util.RebaseDateTime.{rebaseGregorianToJulianDays, rebaseGregorianToJulianMicros} +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy import org.apache.spark.sql.sources import org.apache.spark.unsafe.types.UTF8String @@ -48,7 +50,8 @@ class ParquetFilters( pushDownDecimal: Boolean, pushDownStartWith: Boolean, pushDownInFilterThreshold: Int, - caseSensitive: Boolean) { + caseSensitive: Boolean, + datetimeRebaseMode: LegacyBehaviorPolicy.Value) { // A map which contains parquet field name and data type, if predicate push down applies. // // Each key in `nameToParquetField` represents a column; `dots` are used as separators for @@ -129,14 +132,26 @@ class ParquetFilters( private val ParquetTimestampMillisType = ParquetSchemaType(LogicalTypeAnnotation.timestampType(true, TimeUnit.MILLIS), INT64, 0) - private def dateToDays(date: Any): Int = date match { - case d: Date => DateTimeUtils.fromJavaDate(d) - case ld: LocalDate => DateTimeUtils.localDateToDays(ld) + private def dateToDays(date: Any): Int = { + val gregorianDays = date match { + case d: Date => DateTimeUtils.fromJavaDate(d) + case ld: LocalDate => DateTimeUtils.localDateToDays(ld) + } + datetimeRebaseMode match { + case LegacyBehaviorPolicy.LEGACY => rebaseGregorianToJulianDays(gregorianDays) + case _ => gregorianDays + } } - private def timestampToMicros(v: Any): JLong = v match { - case i: Instant => DateTimeUtils.instantToMicros(i) - case t: Timestamp => DateTimeUtils.fromJavaTimestamp(t) + private def timestampToMicros(v: Any): JLong = { + val gregorianMicros = v match { + case i: Instant => DateTimeUtils.instantToMicros(i) + case t: Timestamp => DateTimeUtils.fromJavaTimestamp(t) + } + datetimeRebaseMode match { + case LegacyBehaviorPolicy.LEGACY => rebaseGregorianToJulianMicros(gregorianMicros) + case _ => gregorianMicros + } } private def decimalToInt32(decimal: JBigDecimal): Integer = decimal.unscaledValue().intValue() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala index 7807604..058669b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala @@ -133,11 +133,21 @@ case class ParquetPartitionReaderFactory( lazy val footerFileMetaData = ParquetFooterReader.readFooter(conf, filePath, SKIP_ROW_GROUPS).getFileMetaData + val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode( + footerFileMetaData.getKeyValueMetaData.get, + datetimeRebaseModeInRead) // Try to push down filters when filter push-down is enabled. val pushed = if (enableParquetFilterPushDown) { val parquetSchema = footerFileMetaData.getSchema - val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp, - pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive) + val parquetFilters = new ParquetFilters( + parquetSchema, + pushDownDate, + pushDownTimestamp, + pushDownDecimal, + pushDownStringStartWith, + pushDownInFilterThreshold, + isCaseSensitive, + datetimeRebaseMode) filters // Collects all converted Parquet filter predicates. Notice that not all predicates can be // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` @@ -170,9 +180,6 @@ case class ParquetPartitionReaderFactory( if (pushed.isDefined) { ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get) } - val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode( - footerFileMetaData.getKeyValueMetaData.get, - datetimeRebaseModeInRead) val int96RebaseMode = DataSourceUtils.int96RebaseMode( footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala index 4405383..4b3f4e7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.connector.read.{Scan, SupportsPushDownFilters} import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex import org.apache.spark.sql.execution.datasources.parquet.{ParquetFilters, SparkToParquetSchemaConverter} import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -51,8 +52,17 @@ case class ParquetScanBuilder( val isCaseSensitive = sqlConf.caseSensitiveAnalysis val parquetSchema = new SparkToParquetSchemaConverter(sparkSession.sessionState.conf).convert(readDataSchema()) - val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp, - pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive) + val parquetFilters = new ParquetFilters( + parquetSchema, + pushDownDate, + pushDownTimestamp, + pushDownDecimal, + pushDownStringStartWith, + pushDownInFilterThreshold, + isCaseSensitive, + // The rebase mode doesn't matter here because the filters are used to determine + // whether they is convertible. + LegacyBehaviorPolicy.CORRECTED) parquetFilters.convertibleFilters(this.filters).toArray } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 230d547..8354158 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -45,7 +45,9 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy.{CORRECTED, LEGACY} +import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType.{INT96, TIMESTAMP_MICROS, TIMESTAMP_MILLIS} import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ import org.apache.spark.tags.ExtendedSQLTest @@ -73,11 +75,14 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared protected def createParquetFilters( schema: MessageType, - caseSensitive: Option[Boolean] = None): ParquetFilters = + caseSensitive: Option[Boolean] = None, + datetimeRebaseMode: LegacyBehaviorPolicy.Value = LegacyBehaviorPolicy.CORRECTED + ): ParquetFilters = new ParquetFilters(schema, conf.parquetFilterPushDownDate, conf.parquetFilterPushDownTimestamp, conf.parquetFilterPushDownDecimal, conf.parquetFilterPushDownStringStartWith, conf.parquetFilterPushDownInFilterThreshold, - caseSensitive.getOrElse(conf.caseSensitiveAnalysis)) + caseSensitive.getOrElse(conf.caseSensitiveAnalysis), + datetimeRebaseMode) override def beforeEach(): Unit = { super.beforeEach() @@ -587,71 +592,75 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared def date: Date = Date.valueOf(s) } - val data = Seq("2018-03-18", "2018-03-19", "2018-03-20", "2018-03-21") + val data = Seq("1000-01-01", "2018-03-19", "2018-03-20", "2018-03-21") import testImplicits._ Seq(false, true).foreach { java8Api => - withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString) { - val dates = data.map(i => Tuple1(Date.valueOf(i))).toDF() - withNestedParquetDataFrame(dates) { case (inputDF, colName, fun) => - implicit val df: DataFrame = inputDF - - def resultFun(dateStr: String): Any = { - val parsed = if (java8Api) LocalDate.parse(dateStr) else Date.valueOf(dateStr) - fun(parsed) - } - - val dateAttr: Expression = df(colName).expr - assert(df(colName).expr.dataType === DateType) - - checkFilterPredicate(dateAttr.isNull, classOf[Eq[_]], Seq.empty[Row]) - checkFilterPredicate(dateAttr.isNotNull, classOf[NotEq[_]], - data.map(i => Row.apply(resultFun(i)))) - - checkFilterPredicate(dateAttr === "2018-03-18".date, classOf[Eq[_]], - resultFun("2018-03-18")) - checkFilterPredicate(dateAttr <=> "2018-03-18".date, classOf[Eq[_]], - resultFun("2018-03-18")) - checkFilterPredicate(dateAttr =!= "2018-03-18".date, classOf[NotEq[_]], - Seq("2018-03-19", "2018-03-20", "2018-03-21").map(i => Row.apply(resultFun(i)))) - - checkFilterPredicate(dateAttr < "2018-03-19".date, classOf[Lt[_]], - resultFun("2018-03-18")) - checkFilterPredicate(dateAttr > "2018-03-20".date, classOf[Gt[_]], - resultFun("2018-03-21")) - checkFilterPredicate(dateAttr <= "2018-03-18".date, classOf[LtEq[_]], - resultFun("2018-03-18")) - checkFilterPredicate(dateAttr >= "2018-03-21".date, classOf[GtEq[_]], - resultFun("2018-03-21")) - - checkFilterPredicate(Literal("2018-03-18".date) === dateAttr, classOf[Eq[_]], - resultFun("2018-03-18")) - checkFilterPredicate(Literal("2018-03-18".date) <=> dateAttr, classOf[Eq[_]], - resultFun("2018-03-18")) - checkFilterPredicate(Literal("2018-03-19".date) > dateAttr, classOf[Lt[_]], - resultFun("2018-03-18")) - checkFilterPredicate(Literal("2018-03-20".date) < dateAttr, classOf[Gt[_]], - resultFun("2018-03-21")) - checkFilterPredicate(Literal("2018-03-18".date) >= dateAttr, classOf[LtEq[_]], - resultFun("2018-03-18")) - checkFilterPredicate(Literal("2018-03-21".date) <= dateAttr, classOf[GtEq[_]], - resultFun("2018-03-21")) - - checkFilterPredicate(!(dateAttr < "2018-03-21".date), classOf[GtEq[_]], - resultFun("2018-03-21")) - checkFilterPredicate( - dateAttr < "2018-03-19".date || dateAttr > "2018-03-20".date, - classOf[Operators.Or], - Seq(Row(resultFun("2018-03-18")), Row(resultFun("2018-03-21")))) + Seq(CORRECTED, LEGACY).foreach { rebaseMode => + withSQLConf( + SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString, + SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> rebaseMode.toString) { + val dates = data.map(i => Tuple1(Date.valueOf(i))).toDF() + withNestedParquetDataFrame(dates) { case (inputDF, colName, fun) => + implicit val df: DataFrame = inputDF + + def resultFun(dateStr: String): Any = { + val parsed = if (java8Api) LocalDate.parse(dateStr) else Date.valueOf(dateStr) + fun(parsed) + } - Seq(3, 20).foreach { threshold => - withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD.key -> s"$threshold") { - checkFilterPredicate( - In(dateAttr, Array("2018-03-19".date, "2018-03-20".date, "2018-03-21".date, - "2018-03-22".date).map(Literal.apply)), - if (threshold == 3) classOf[Operators.And] else classOf[Operators.Or], - Seq(Row(resultFun("2018-03-19")), Row(resultFun("2018-03-20")), - Row(resultFun("2018-03-21")))) + val dateAttr: Expression = df(colName).expr + assert(df(colName).expr.dataType === DateType) + + checkFilterPredicate(dateAttr.isNull, classOf[Eq[_]], Seq.empty[Row]) + checkFilterPredicate(dateAttr.isNotNull, classOf[NotEq[_]], + data.map(i => Row.apply(resultFun(i)))) + + checkFilterPredicate(dateAttr === "1000-01-01".date, classOf[Eq[_]], + resultFun("1000-01-01")) + checkFilterPredicate(dateAttr <=> "1000-01-01".date, classOf[Eq[_]], + resultFun("1000-01-01")) + checkFilterPredicate(dateAttr =!= "1000-01-01".date, classOf[NotEq[_]], + Seq("2018-03-19", "2018-03-20", "2018-03-21").map(i => Row.apply(resultFun(i)))) + + checkFilterPredicate(dateAttr < "2018-03-19".date, classOf[Lt[_]], + resultFun("1000-01-01")) + checkFilterPredicate(dateAttr > "2018-03-20".date, classOf[Gt[_]], + resultFun("2018-03-21")) + checkFilterPredicate(dateAttr <= "1000-01-01".date, classOf[LtEq[_]], + resultFun("1000-01-01")) + checkFilterPredicate(dateAttr >= "2018-03-21".date, classOf[GtEq[_]], + resultFun("2018-03-21")) + + checkFilterPredicate(Literal("1000-01-01".date) === dateAttr, classOf[Eq[_]], + resultFun("1000-01-01")) + checkFilterPredicate(Literal("1000-01-01".date) <=> dateAttr, classOf[Eq[_]], + resultFun("1000-01-01")) + checkFilterPredicate(Literal("2018-03-19".date) > dateAttr, classOf[Lt[_]], + resultFun("1000-01-01")) + checkFilterPredicate(Literal("2018-03-20".date) < dateAttr, classOf[Gt[_]], + resultFun("2018-03-21")) + checkFilterPredicate(Literal("1000-01-01".date) >= dateAttr, classOf[LtEq[_]], + resultFun("1000-01-01")) + checkFilterPredicate(Literal("2018-03-21".date) <= dateAttr, classOf[GtEq[_]], + resultFun("2018-03-21")) + + checkFilterPredicate(!(dateAttr < "2018-03-21".date), classOf[GtEq[_]], + resultFun("2018-03-21")) + checkFilterPredicate( + dateAttr < "2018-03-19".date || dateAttr > "2018-03-20".date, + classOf[Operators.Or], + Seq(Row(resultFun("1000-01-01")), Row(resultFun("2018-03-21")))) + + Seq(3, 20).foreach { threshold => + withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD.key -> s"$threshold") { + checkFilterPredicate( + In(dateAttr, Array("2018-03-19".date, "2018-03-20".date, "2018-03-21".date, + "2018-03-22".date).map(Literal.apply)), + if (threshold == 3) classOf[Operators.And] else classOf[Operators.Or], + Seq(Row(resultFun("2018-03-19")), Row(resultFun("2018-03-20")), + Row(resultFun("2018-03-21")))) + } } } } @@ -661,35 +670,36 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared test("filter pushdown - timestamp") { Seq(true, false).foreach { java8Api => - withSQLConf( - SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString, - SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> "CORRECTED", - SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> "CORRECTED") { - // spark.sql.parquet.outputTimestampType = TIMESTAMP_MILLIS + Seq(CORRECTED, LEGACY).foreach { rebaseMode => val millisData = Seq( "1000-06-14 08:28:53.123", "1582-06-15 08:28:53.001", "1900-06-16 08:28:53.0", "2018-06-17 08:28:53.999") - withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> - ParquetOutputTimestampType.TIMESTAMP_MILLIS.toString) { + withSQLConf( + SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString, + SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> rebaseMode.toString, + SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> TIMESTAMP_MILLIS.toString) { testTimestampPushdown(millisData, java8Api) } - // spark.sql.parquet.outputTimestampType = TIMESTAMP_MICROS val microsData = Seq( "1000-06-14 08:28:53.123456", "1582-06-15 08:28:53.123456", "1900-06-16 08:28:53.123456", "2018-06-17 08:28:53.123456") - withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> - ParquetOutputTimestampType.TIMESTAMP_MICROS.toString) { + withSQLConf( + SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString, + SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> rebaseMode.toString, + SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> TIMESTAMP_MICROS.toString) { testTimestampPushdown(microsData, java8Api) } - // spark.sql.parquet.outputTimestampType = INT96 doesn't support pushdown - withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> - ParquetOutputTimestampType.INT96.toString) { + // INT96 doesn't support pushdown + withSQLConf( + SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString, + SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> rebaseMode.toString, + SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> INT96.toString) { import testImplicits._ withTempPath { file => millisData.map(i => Tuple1(Timestamp.valueOf(i))).toDF --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org