Ivan Sadikov created SPARK-45194:
------------------------------------

             Summary: Parquet reads fail with "RuntimeException: Unable to 
create Parquet converter for data type "timestamp_ntz" due to incorrect schema 
inference
                 Key: SPARK-45194
                 URL: https://issues.apache.org/jira/browse/SPARK-45194
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.5.0, 3.4.1, 4.0.0
            Reporter: Ivan Sadikov


I found that Parquet reads could fail due to incorrect schema inference with 
two conflicting types exist in files. This is caused by the fact that schema 
inference only considers one file by default which could contain different 
types than what in other file.

We have {{spark.sql.parquet.mergeSchema}} is set to `false` by default. This 
causes schema inference to pick a file (depending on the order the file system 
returns files) and infer schema based on that file. However, if you have 
conflicting types or a smaller/narrower type is selected, instead of failing 
during schema inference, an exception is thrown during the subsequent read.

In this case, we infer schema based on the file with TIMESTAMP_NTZ and fail to 
read the file that contains TIMESTAMP_LTZ:
{code:java}
[info]   Cause: java.lang.RuntimeException: Unable to create Parquet converter 
for data type "timestamp_ntz" whose Parquet type is int64 
time(TIMESTAMP(MILLIS,true))
[info]   at 
org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory.convertErrorForTimestampNTZ(ParquetVectorUpdaterFactory.java:209)
[info]   at 
org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory.validateTimestampType(ParquetVectorUpdaterFactory.java:203)
[info]   at 
org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory.getUpdater(ParquetVectorUpdaterFactory.java:121)
[info]   at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:175){code}
Note that if the file with TIMESTAMP_LTZ is selected, the read succeeds.

 

Here is the repro as a unit test that you can run in Spark master. Just add the 
test to ParquetIOSuite or some other test suite.
{code:java}
import org.apache.hadoop.conf._
import org.apache.hadoop.fs._
import org.apache.parquet.example.data.simple._
import org.apache.parquet.hadoop.example._
import org.apache.parquet.schema._// Creates a Parquet file with two simple 
columns: integer and timestamp.
// Depending on isUTC flag, the timestamp is either NTZ or LTZ.
private def createParquetFile(path: String, isUTC: Boolean): Unit = {
  val schema = MessageTypeParser.parseMessageType(
    s"""
    message schema {
      optional int32 a;
      optional int64 ts (TIMESTAMP(MILLIS, $isUTC));
    }
    """
  )
  val conf = new Configuration(false)
  conf.set("parquet.example.schema", schema.toString)
  val writer = ExampleParquetWriter.builder(new 
Path(path)).withConf(conf).build()
  for (i <- 0 until 2) {
    val group = new SimpleGroup(schema)
    group.add("a", 1)
    group.add("ts", System.currentTimeMillis)
    writer.write(group)
  }
  writer.close()
}test("repro") {
  withTempPath { dir =>
    createParquetFile(dir + "/file-1.parquet", false) // NTZ
    createParquetFile(dir + "/file-2.parquet", true) // LTZ    val df = 
spark.read.parquet(dir.getAbsolutePath)
    df.show() // fails
  }
} {code}
If you run the repro as is, you will get: 
{code:java}
[info]   Cause: java.lang.RuntimeException: Unable to create Parquet converter 
for data type "timestamp_ntz" whose Parquet type is int64 
time(TIMESTAMP(MILLIS,true)) {code}
If you swap the files (file names), the read succeeds.
{code:java}
+---+--------------------+
|  a|                  ts|
+---+--------------------+
|  1|2023-09-17 21:59:...|
|  1|2023-09-17 21:59:...|
|  1|2023-09-17 21:59:...|
|  1|2023-09-17 21:59:...|
+---+--------------------+ {code}
If you set spark.sql.parquet.mergeSchema to true, the schema inference fails 
with
{code:java}
[info]   org.apache.spark.SparkException: [CANNOT_MERGE_SCHEMAS] Failed merging 
schemas:
[info] Initial schema:
[info] "STRUCT<a: INT, ts: TIMESTAMP_NTZ>"
[info] Schema that cannot be merged with the initial schema:
[info] "STRUCT<a: INT, ts: TIMESTAMP>". {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to