This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 3ec9b05a23c [SPARK-40819][SQL][3.3] Timestamp nanos behaviour regression 3ec9b05a23c is described below commit 3ec9b05a23cc780438772d847b2fab19aab2d60a Author: alfreddavidson <alfie.davids...@gmail.com> AuthorDate: Wed Feb 8 11:07:25 2023 +0900 [SPARK-40819][SQL][3.3] Timestamp nanos behaviour regression As per HyukjinKwon request on https://github.com/apache/spark/pull/38312 to backport fix into 3.3 ### What changes were proposed in this pull request? Handle `TimeUnit.NANOS` for parquet `Timestamps` addressing a regression in behaviour since 3.2 ### Why are the changes needed? Since version 3.2 reading parquet files that contain attributes with type `TIMESTAMP(NANOS,true)` is not possible as ParquetSchemaConverter returns ``` Caused by: org.apache.spark.sql.AnalysisException: Illegal Parquet type: INT64 (TIMESTAMP(NANOS,true)) ``` https://issues.apache.org/jira/browse/SPARK-34661 introduced a change matching on the `LogicalTypeAnnotation` which only covers Timestamp cases for `TimeUnit.MILLIS` and `TimeUnit.MICROS` meaning `TimeUnit.NANOS` would return `illegalType()` Prior to 3.2 the matching used the `originalType` which for `TIMESTAMP(NANOS,true)` return `null` and therefore resulted to a `LongType`, the change proposed is too consider `TimeUnit.NANOS` and return `LongType` making behaviour the same as before. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit test covering this scenario. Internally deployed to read parquet files that contain `TIMESTAMP(NANOS,true)` Closes #39904 from awdavidson/ts-nanos-fix-3.3. Lead-authored-by: alfreddavidson <alfie.davids...@gmail.com> Co-authored-by: awdavidson <54780428+awdavid...@users.noreply.github.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../org/apache/spark/sql/internal/SQLConf.scala | 9 +++ .../parquet/SpecificParquetRecordReaderBase.java | 2 + .../datasources/parquet/ParquetFileFormat.scala | 14 +++- .../parquet/ParquetSchemaConverter.scala | 15 ++++- .../datasources/v2/parquet/ParquetScan.scala | 4 ++ .../resources/test-data/timestamp-nanos.parquet | Bin 0 -> 784 bytes .../datasources/parquet/ParquetSchemaSuite.scala | 72 +++++++++++++++++++-- 7 files changed, 104 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index d9e38ea9258..cab2aad08cc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3459,6 +3459,13 @@ object SQLConf { .booleanConf .createWithDefault(false) + val LEGACY_PARQUET_NANOS_AS_LONG = buildConf("spark.sql.legacy.parquet.nanosAsLong") + .internal() + .doc("When true, the Parquet's nanos precision timestamps are converted to SQL long values.") + .version("3.2.3") + .booleanConf + .createWithDefault(false) + val PARQUET_INT96_REBASE_MODE_IN_WRITE = buildConf("spark.sql.parquet.int96RebaseModeInWrite") .internal() @@ -4525,6 +4532,8 @@ class SQLConf extends Serializable with Logging { def ignoreMissingParquetFieldId: Boolean = getConf(SQLConf.IGNORE_MISSING_PARQUET_FIELD_ID) + def legacyParquetNanosAsLong: Boolean = getConf(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG) + def useV1Command: Boolean = getConf(SQLConf.LEGACY_USE_V1_COMMAND) def histogramNumericPropagateInputType: Boolean = diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java index 48016c3fdc0..49dbdf87ae0 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java @@ -149,6 +149,7 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo config.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key() , false); config.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), false); config.setBoolean(SQLConf.CASE_SENSITIVE().key(), false); + config.setBoolean(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG().key(), false); this.file = new Path(path); long length = this.file.getFileSystem(config).getFileStatus(this.file).getLen(); @@ -199,6 +200,7 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo config.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key() , false); config.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), false); config.setBoolean(SQLConf.CASE_SENSITIVE().key(), false); + config.setBoolean(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG().key(), false); this.parquetColumn = new ParquetToSparkSchemaConverter(config) .convertParquetColumn(requestedSchema, Option.empty()); this.sparkSchema = (StructType) parquetColumn.sparkType(); diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index f66434d3caf..e63881422aa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -123,6 +123,10 @@ class ParquetFileFormat SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.key, sparkSession.sessionState.conf.parquetFieldIdWriteEnabled.toString) + conf.set( + SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key, + sparkSession.sessionState.conf.legacyParquetNanosAsLong.toString) + // Sets compression scheme conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName) @@ -239,6 +243,9 @@ class ParquetFileFormat hadoopConf.setBoolean( SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, sparkSession.sessionState.conf.isParquetINT96AsTimestamp) + hadoopConf.setBoolean( + SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key, + sparkSession.sessionState.conf.legacyParquetNanosAsLong) val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) @@ -440,7 +447,8 @@ object ParquetFileFormat extends Logging { val converter = new ParquetToSparkSchemaConverter( sparkSession.sessionState.conf.isParquetBinaryAsString, - sparkSession.sessionState.conf.isParquetINT96AsTimestamp) + sparkSession.sessionState.conf.isParquetINT96AsTimestamp, + nanosAsLong = sparkSession.sessionState.conf.legacyParquetNanosAsLong) val seen = mutable.HashSet[String]() val finalSchemas: Seq[StructType] = footers.flatMap { footer => @@ -536,12 +544,14 @@ object ParquetFileFormat extends Logging { sparkSession: SparkSession): Option[StructType] = { val assumeBinaryIsString = sparkSession.sessionState.conf.isParquetBinaryAsString val assumeInt96IsTimestamp = sparkSession.sessionState.conf.isParquetINT96AsTimestamp + val nanosAsLong = sparkSession.sessionState.conf.legacyParquetNanosAsLong val reader = (files: Seq[FileStatus], conf: Configuration, ignoreCorruptFiles: Boolean) => { // Converter used to convert Parquet `MessageType` to Spark SQL `StructType` val converter = new ParquetToSparkSchemaConverter( assumeBinaryIsString = assumeBinaryIsString, - assumeInt96IsTimestamp = assumeInt96IsTimestamp) + assumeInt96IsTimestamp = assumeInt96IsTimestamp, + nanosAsLong = nanosAsLong) readParquetFootersInParallel(conf, files, ignoreCorruptFiles) .map(ParquetFileFormat.readSchemaFromFooter(_, converter)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala index 3419bf15f8e..d9ccb524777 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala @@ -49,21 +49,25 @@ import org.apache.spark.util.Utils * [[TimestampType]] fields. * @param caseSensitive Whether use case sensitive analysis when comparing Spark catalyst read * schema with Parquet schema + * @param nanosAsLong Whether timestamps with nanos are converted to long. */ class ParquetToSparkSchemaConverter( assumeBinaryIsString: Boolean = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get, assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get, - caseSensitive: Boolean = SQLConf.CASE_SENSITIVE.defaultValue.get) { + caseSensitive: Boolean = SQLConf.CASE_SENSITIVE.defaultValue.get, + nanosAsLong: Boolean = SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.defaultValue.get) { def this(conf: SQLConf) = this( assumeBinaryIsString = conf.isParquetBinaryAsString, assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp, - caseSensitive = conf.caseSensitiveAnalysis) + caseSensitive = conf.caseSensitiveAnalysis, + nanosAsLong = conf.legacyParquetNanosAsLong) def this(conf: Configuration) = this( assumeBinaryIsString = conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean, assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean, - caseSensitive = conf.get(SQLConf.CASE_SENSITIVE.key).toBoolean) + caseSensitive = conf.get(SQLConf.CASE_SENSITIVE.key).toBoolean, + conf.get(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key).toBoolean) /** @@ -257,6 +261,11 @@ class ParquetToSparkSchemaConverter( // SPARK-38829: Remove TimestampNTZ type support in Parquet for Spark 3.3 if (Utils.isTesting) TimestampNTZType else TimestampType } + // SPARK-40819: NANOS are not supported as a Timestamp, convert to LongType without + // timezone awareness to address behaviour regression introduced by SPARK-34661 + case timestamp: TimestampLogicalTypeAnnotation + if timestamp.getUnit == TimeUnit.NANOS && nanosAsLong => + LongType case _ => illegalType() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala index 99632d79cd8..0fec27505d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala @@ -86,6 +86,10 @@ case class ParquetScan( SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, sparkSession.sessionState.conf.isParquetINT96AsTimestamp) + hadoopConf.setBoolean( + SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key, + sparkSession.sessionState.conf.legacyParquetNanosAsLong) + val broadcastedConf = sparkSession.sparkContext.broadcast( new SerializableConfiguration(hadoopConf)) val sqlConf = sparkSession.sessionState.conf diff --git a/sql/core/src/test/resources/test-data/timestamp-nanos.parquet b/sql/core/src/test/resources/test-data/timestamp-nanos.parquet new file mode 100644 index 00000000000..962aa909b82 Binary files /dev/null and b/sql/core/src/test/resources/test-data/timestamp-nanos.parquet differ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index d0228d7bdf9..a62dd53342a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -30,6 +30,7 @@ import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException +import org.apache.spark.sql.functions.desc import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType._ import org.apache.spark.sql.test.SharedSparkSession @@ -46,7 +47,8 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession { binaryAsString: Boolean, int96AsTimestamp: Boolean, writeLegacyParquetFormat: Boolean, - expectedParquetColumn: Option[ParquetColumn] = None): Unit = { + expectedParquetColumn: Option[ParquetColumn] = None, + nanosAsLong: Boolean = false): Unit = { testSchema( testName, StructType.fromAttributes(ScalaReflection.attributesFor[T]), @@ -54,7 +56,8 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession { binaryAsString, int96AsTimestamp, writeLegacyParquetFormat, - expectedParquetColumn = expectedParquetColumn) + expectedParquetColumn = expectedParquetColumn, + nanosAsLong = nanosAsLong) } protected def testParquetToCatalyst( @@ -65,11 +68,13 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession { int96AsTimestamp: Boolean, caseSensitive: Boolean = false, sparkReadSchema: Option[StructType] = None, - expectedParquetColumn: Option[ParquetColumn] = None): Unit = { + expectedParquetColumn: Option[ParquetColumn] = None, + nanosAsLong: Boolean = false): Unit = { val converter = new ParquetToSparkSchemaConverter( assumeBinaryIsString = binaryAsString, assumeInt96IsTimestamp = int96AsTimestamp, - caseSensitive = caseSensitive) + caseSensitive = caseSensitive, + nanosAsLong = nanosAsLong) test(s"sql <= parquet: $testName") { val actualParquetColumn = converter.convertParquetColumn( @@ -117,7 +122,8 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession { writeLegacyParquetFormat: Boolean, outputTimestampType: SQLConf.ParquetOutputTimestampType.Value = SQLConf.ParquetOutputTimestampType.INT96, - expectedParquetColumn: Option[ParquetColumn] = None): Unit = { + expectedParquetColumn: Option[ParquetColumn] = None, + nanosAsLong: Boolean = false): Unit = { testCatalystToParquet( testName, @@ -132,7 +138,8 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession { parquetSchema, binaryAsString, int96AsTimestamp, - expectedParquetColumn = expectedParquetColumn) + expectedParquetColumn = expectedParquetColumn, + nanosAsLong = nanosAsLong) } protected def compareParquetColumn(actual: ParquetColumn, expected: ParquetColumn): Unit = { @@ -147,7 +154,14 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession { val expectedDesc = expected.descriptor.get assert(actualDesc.getMaxRepetitionLevel == expectedDesc.getMaxRepetitionLevel) assert(actualDesc.getMaxRepetitionLevel == expectedDesc.getMaxRepetitionLevel) - assert(actualDesc.getPrimitiveType === expectedDesc.getPrimitiveType) + + actualDesc.getPrimitiveType.getLogicalTypeAnnotation match { + case timestamp: LogicalTypeAnnotation.TimestampLogicalTypeAnnotation + if timestamp.getUnit == LogicalTypeAnnotation.TimeUnit.NANOS => + assert(actual.sparkType == expected.sparkType) + case _ => + assert(actualDesc.getPrimitiveType === expectedDesc.getPrimitiveType) + } } assert(actual.repetitionLevel == expected.repetitionLevel, "repetition level mismatch: " + @@ -195,6 +209,32 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession { } class ParquetSchemaInferenceSuite extends ParquetSchemaTest { + testSchemaInference[Tuple1[Long]]( + "timestamp nanos", + """ + |message root { + | required int64 _1 (TIMESTAMP(NANOS,true)); + |} + """.stripMargin, + binaryAsString = false, + int96AsTimestamp = true, + writeLegacyParquetFormat = true, + expectedParquetColumn = Some( + ParquetColumn( + sparkType = StructType.fromAttributes( + ScalaReflection.attributesFor[Tuple1[Long]]), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 0, + required = false, + path = Seq(), + children = Seq( + primitiveParquetColumn(LongType, PrimitiveTypeName.INT64, Repetition.REQUIRED, + 0, 0, Seq("_1"), logicalTypeAnnotation = Some(LogicalTypeAnnotation.intType(64, false))) + ))), + nanosAsLong = true + ) + testSchemaInference[(Boolean, Int, Long, Float, Double, Array[Byte])]( "basic types", """ @@ -1005,6 +1045,24 @@ class ParquetSchemaSuite extends ParquetSchemaTest { } } + test("SPARK-40819: parquet file with TIMESTAMP(NANOS, true) (with nanosAsLong=true)") { + val tsAttribute = "birthday" + withSQLConf(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key -> "true") { + val testDataPath = testFile("test-data/timestamp-nanos.parquet") + val data = spark.read.parquet(testDataPath).select(tsAttribute) + assert(data.schema.fields.head.dataType == LongType) + assert(data.orderBy(desc(tsAttribute)).take(1).head.getAs[Long](0) == 1668537129123534758L) + } + } + + test("SPARK-40819: parquet file with TIMESTAMP(NANOS, true) (with default nanosAsLong=false)") { + val testDataPath = testFile("test-data/timestamp-nanos.parquet") + val e = intercept[SparkException] { + spark.read.parquet(testDataPath).collect() + } + assert(e.getMessage.contains("Illegal Parquet type: INT64 (TIMESTAMP(NANOS,true))")) + } + // ======================================================= // Tests for converting Parquet LIST to Catalyst ArrayType // ======================================================= --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org