Ivan Sadikov created SPARK-45194: ------------------------------------ Summary: Parquet reads fail with "RuntimeException: Unable to create Parquet converter for data type "timestamp_ntz" due to incorrect schema inference Key: SPARK-45194 URL: https://issues.apache.org/jira/browse/SPARK-45194 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.5.0, 3.4.1, 4.0.0 Reporter: Ivan Sadikov
I found that Parquet reads could fail due to incorrect schema inference with two conflicting types exist in files. This is caused by the fact that schema inference only considers one file by default which could contain different types than what in other file. We have {{spark.sql.parquet.mergeSchema}} is set to `false` by default. This causes schema inference to pick a file (depending on the order the file system returns files) and infer schema based on that file. However, if you have conflicting types or a smaller/narrower type is selected, instead of failing during schema inference, an exception is thrown during the subsequent read. In this case, we infer schema based on the file with TIMESTAMP_NTZ and fail to read the file that contains TIMESTAMP_LTZ: {code:java} [info] Cause: java.lang.RuntimeException: Unable to create Parquet converter for data type "timestamp_ntz" whose Parquet type is int64 time(TIMESTAMP(MILLIS,true)) [info] at org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory.convertErrorForTimestampNTZ(ParquetVectorUpdaterFactory.java:209) [info] at org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory.validateTimestampType(ParquetVectorUpdaterFactory.java:203) [info] at org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory.getUpdater(ParquetVectorUpdaterFactory.java:121) [info] at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:175){code} Note that if the file with TIMESTAMP_LTZ is selected, the read succeeds. Here is the repro as a unit test that you can run in Spark master. Just add the test to ParquetIOSuite or some other test suite. {code:java} import org.apache.hadoop.conf._ import org.apache.hadoop.fs._ import org.apache.parquet.example.data.simple._ import org.apache.parquet.hadoop.example._ import org.apache.parquet.schema._// Creates a Parquet file with two simple columns: integer and timestamp. // Depending on isUTC flag, the timestamp is either NTZ or LTZ. private def createParquetFile(path: String, isUTC: Boolean): Unit = { val schema = MessageTypeParser.parseMessageType( s""" message schema { optional int32 a; optional int64 ts (TIMESTAMP(MILLIS, $isUTC)); } """ ) val conf = new Configuration(false) conf.set("parquet.example.schema", schema.toString) val writer = ExampleParquetWriter.builder(new Path(path)).withConf(conf).build() for (i <- 0 until 2) { val group = new SimpleGroup(schema) group.add("a", 1) group.add("ts", System.currentTimeMillis) writer.write(group) } writer.close() }test("repro") { withTempPath { dir => createParquetFile(dir + "/file-1.parquet", false) // NTZ createParquetFile(dir + "/file-2.parquet", true) // LTZ val df = spark.read.parquet(dir.getAbsolutePath) df.show() // fails } } {code} If you run the repro as is, you will get: {code:java} [info] Cause: java.lang.RuntimeException: Unable to create Parquet converter for data type "timestamp_ntz" whose Parquet type is int64 time(TIMESTAMP(MILLIS,true)) {code} If you swap the files (file names), the read succeeds. {code:java} +---+--------------------+ | a| ts| +---+--------------------+ | 1|2023-09-17 21:59:...| | 1|2023-09-17 21:59:...| | 1|2023-09-17 21:59:...| | 1|2023-09-17 21:59:...| +---+--------------------+ {code} If you set spark.sql.parquet.mergeSchema to true, the schema inference fails with {code:java} [info] org.apache.spark.SparkException: [CANNOT_MERGE_SCHEMAS] Failed merging schemas: [info] Initial schema: [info] "STRUCT<a: INT, ts: TIMESTAMP_NTZ>" [info] Schema that cannot be merged with the initial schema: [info] "STRUCT<a: INT, ts: TIMESTAMP>". {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org