mbutrovich opened a new issue, #3856:
URL: https://github.com/apache/datafusion-comet/issues/3856

   ### Describe the bug
   
   CometIcebergNativeScan reads INT96 timestamps incorrectly, resulting in 
~1170 year offset.
   
   **Example:**
   - Correct (Spark/Java): `3332-12-14 11:33:10.965`
   - Comet/iceberg-rust: `2163-11-05 13:24:03.545896`
   
   ### What's Affected?
   
   **Column values**: WRONG - read via iceberg-rust's ArrowReader with wrong 
TimeUnit
   
   **Partition values**: CORRECT - stored in manifest files, computed by 
Iceberg-Java's `ParquetUtil.extractTimestampInt96()` which correctly converts 
to microseconds:
   ```java
   // 
iceberg/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java:184-185
   return TimeUnit.DAYS.toMicros(julianDay - UNIX_EPOCH_JULIAN)
       + TimeUnit.NANOSECONDS.toMicros(timeOfDayNanos);
   
   ### The Bug (Column Values Only)
   1. **arrow-rs** defaults INT96 to `Timestamp(Nanosecond, None)` 
(`parquet/src/arrow/schema/primitive.rs:122`)
   2. **User's PR #7285** fixed arrow-rs to respect schema hints - if you pass 
`Timestamp(Microsecond)`, it converts correctly
   3. **iceberg-rust** passes the Parquet-derived schema to 
`ArrowReaderOptions.with_schema()` (`reader.rs:366`)
   4. This schema has `Timestamp(Nanosecond)` (from Parquet default), NOT 
`Timestamp(Microsecond)` (from Iceberg spec)
   5. So arrow-rs reads INT96 as nanoseconds, but Iceberg expects microseconds
   
   ### Steps to reproduce
   
   ```scala
   import scala.collection.JavaConverters._
   
   import org.apache.spark.sql.CometTestBase
   import org.apache.spark.sql.comet.CometIcebergNativeScanExec
   import org.apache.spark.sql.execution.SparkPlan
   import org.apache.spark.sql.internal.SQLConf
   
   
   test("migration - INT96 timestamp with hour partitioning") {
     assume(icebergAvailable, "Iceberg not available in classpath")
   
     withTempIcebergDir { warehouseDir =>
       withSQLConf(
         "spark.sql.catalog.test_cat" -> 
"org.apache.iceberg.spark.SparkCatalog",
         "spark.sql.catalog.test_cat.type" -> "hadoop",
         "spark.sql.catalog.test_cat.warehouse" -> warehouseDir.getAbsolutePath,
         CometConf.COMET_ENABLED.key -> "true",
         CometConf.COMET_EXEC_ENABLED.key -> "true",
         CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") {
   
         import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator}
         import org.apache.spark.sql.functions.monotonically_increasing_id
         import org.apache.spark.sql.types._
   
         val dataPath = s"${warehouseDir.getAbsolutePath}/int96_data"
         val numRows = 50
         val r = new scala.util.Random(42)
   
         // Schema for FuzzDataGenerator - just timestamp and value columns
         val fuzzSchema = StructType(Seq(
           StructField("outputTimestamp", TimestampType, nullable = true),
           StructField("value", DoubleType, nullable = true)))
   
         // Use FuzzDataGenerator with default options (year 3333 baseDate for 
INT96)
         val dataGenOptions = DataGenOptions(allowNull = false)
         val fuzzDf = FuzzDataGenerator.generateDataFrame(r, spark, fuzzSchema, 
numRows, dataGenOptions)
   
         // Add unique id and geohash columns
         val df = fuzzDf
           .withColumn("id", monotonically_increasing_id())
           .selectExpr(
             "id",
             "outputTimestamp",
             "concat(substring('0123456789bcdefghjkmnpqrstuvwxyz', 1 + int(id % 
32), 1), " +
               "substring('0123456789bcdefghjkmnpqrstuvwxyz', 1 + int((id / 32) 
% 32), 1), " +
               "substring('0123456789bcdefghjkmnpqrstuvwxyz', 1 + int((id / 
1024) % 32), 1)) as geohash3",
             "value")
   
         // Write Parquet with INT96 timestamps
         withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "INT96") {
           df.write.mode("overwrite").parquet(dataPath)
         }
   
         // Verify the Parquet files actually contain INT96 timestamps
         val parquetFiles = new java.io.File(dataPath).listFiles()
           .filter(f => f.getName.endsWith(".parquet"))
         assert(parquetFiles.nonEmpty, "Expected at least one Parquet file")
   
         val parquetFile = parquetFiles.head
         val reader = org.apache.parquet.hadoop.ParquetFileReader.open(
           org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(
             new org.apache.hadoop.fs.Path(parquetFile.getAbsolutePath),
             spark.sessionState.newHadoopConf()))
         try {
           val parquetSchema = reader.getFooter.getFileMetaData.getSchema
           val timestampColumn = parquetSchema.getColumns.asScala
             .find(_.getPath.mkString(".") == "outputTimestamp")
           assert(timestampColumn.isDefined, "Expected outputTimestamp column 
in Parquet schema")
           assert(
             timestampColumn.get.getPrimitiveType.getPrimitiveTypeName ==
               org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT96,
             s"Expected INT96 type for outputTimestamp but got 
${timestampColumn.get.getPrimitiveType.getPrimitiveTypeName}")
         } finally {
           reader.close()
         }
   
         // Create Iceberg table with hour(timestamp) + truncate(geohash, 3) 
partitioning
         spark.sql("CREATE NAMESPACE IF NOT EXISTS test_cat.db")
         spark.sql(s"""
           CREATE TABLE test_cat.db.int96_hour_test (
             id BIGINT,
             outputTimestamp TIMESTAMP,
             geohash3 STRING,
             value DOUBLE
           ) USING iceberg
           PARTITIONED BY (hours(outputTimestamp), truncate(geohash3, 3))
         """)
   
         // Use SparkTableUtil.importSparkTable to import the Parquet files
         try {
           val tableUtilClass = 
Class.forName("org.apache.iceberg.spark.SparkTableUtil")
           val sparkCatalog = spark.sessionState.catalogManager
             .catalog("test_cat")
             .asInstanceOf[org.apache.iceberg.spark.SparkCatalog]
           val ident = 
org.apache.spark.sql.connector.catalog.Identifier.of(Array("db"), 
"int96_hour_test")
           val sparkTable = sparkCatalog.loadTable(ident)
             .asInstanceOf[org.apache.iceberg.spark.source.SparkTable]
           val table = sparkTable.table()
   
           val stagingDir = s"${warehouseDir.getAbsolutePath}/staging"
   
           // Create a temp table pointing to the parquet path
           spark.sql(s"""CREATE TABLE parquet_temp USING parquet LOCATION 
'$dataPath'""")
           val sourceIdent = new 
org.apache.spark.sql.catalyst.TableIdentifier("parquet_temp")
   
           val importMethod = tableUtilClass.getMethod(
             "importSparkTable",
             classOf[org.apache.spark.sql.SparkSession],
             classOf[org.apache.spark.sql.catalyst.TableIdentifier],
             classOf[org.apache.iceberg.Table],
             classOf[String])
           importMethod.invoke(null, spark, sourceIdent, table, stagingDir)
   
           // Query the table and verify no duplicates
           val distinctCount = spark
             .sql("SELECT COUNT(DISTINCT id) FROM test_cat.db.int96_hour_test")
             .collect()(0)
             .getLong(0)
           assert(distinctCount == numRows, s"Expected $numRows distinct IDs 
but got $distinctCount")
   
           checkIcebergNativeScan("SELECT * FROM test_cat.db.int96_hour_test 
ORDER BY id")
           checkIcebergNativeScan(
             "SELECT id, outputTimestamp FROM test_cat.db.int96_hour_test WHERE 
id < 50 ORDER BY id")
   
           spark.sql("DROP TABLE test_cat.db.int96_hour_test")
           spark.sql("DROP TABLE parquet_temp")
         } catch {
           case _: ClassNotFoundException =>
             cancel("SparkTableUtil not available")
         }
       }
     }
   }
   ```
   
   ### Expected behavior
   
   We should get the same results back as reading through Iceberg Java.
   
   ### Additional context
   
   ## Fix Location
   
   The fix needs to be in **iceberg-rust**, not Comet:
   - Modify `reader.rs` to overlay Iceberg schema types onto the 
Parquet-derived schema before passing to `ArrowReaderOptions`
   - For timestamp columns, ensure `Timestamp(Microsecond, ...)` is passed, 
triggering arrow-rs's INT96 conversion logic from PR #7285
   
   ## Files to Modify
   
   1. `/Users/matt/git/iceberg-rust/crates/iceberg/src/arrow/reader.rs`
      - After building `arrow_schema` from Parquet, overlay Iceberg schema's 
timestamp types
      - Specifically, for columns where Iceberg type is Timestamp/Timestamptz, 
ensure Arrow schema has `Timestamp(Microsecond, ...)`
      - 
   ## Related PRs
   - arrow-rs #7285: Support different TimeUnits and timezones when reading 
Timestamps from INT96
   - datafusion #15537: INT96 handling in DataFusion
   - comet #1652: INT96 handling in Comet


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to