mbutrovich opened a new issue, #3856:
URL: https://github.com/apache/datafusion-comet/issues/3856
### Describe the bug
CometIcebergNativeScan reads INT96 timestamps incorrectly, resulting in
~1170 year offset.
**Example:**
- Correct (Spark/Java): `3332-12-14 11:33:10.965`
- Comet/iceberg-rust: `2163-11-05 13:24:03.545896`
### What's Affected?
**Column values**: WRONG - read via iceberg-rust's ArrowReader with wrong
TimeUnit
**Partition values**: CORRECT - stored in manifest files, computed by
Iceberg-Java's `ParquetUtil.extractTimestampInt96()` which correctly converts
to microseconds:
```java
//
iceberg/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java:184-185
return TimeUnit.DAYS.toMicros(julianDay - UNIX_EPOCH_JULIAN)
+ TimeUnit.NANOSECONDS.toMicros(timeOfDayNanos);
### The Bug (Column Values Only)
1. **arrow-rs** defaults INT96 to `Timestamp(Nanosecond, None)`
(`parquet/src/arrow/schema/primitive.rs:122`)
2. **User's PR #7285** fixed arrow-rs to respect schema hints - if you pass
`Timestamp(Microsecond)`, it converts correctly
3. **iceberg-rust** passes the Parquet-derived schema to
`ArrowReaderOptions.with_schema()` (`reader.rs:366`)
4. This schema has `Timestamp(Nanosecond)` (from Parquet default), NOT
`Timestamp(Microsecond)` (from Iceberg spec)
5. So arrow-rs reads INT96 as nanoseconds, but Iceberg expects microseconds
### Steps to reproduce
```scala
import scala.collection.JavaConverters._
import org.apache.spark.sql.CometTestBase
import org.apache.spark.sql.comet.CometIcebergNativeScanExec
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.internal.SQLConf
test("migration - INT96 timestamp with hour partitioning") {
assume(icebergAvailable, "Iceberg not available in classpath")
withTempIcebergDir { warehouseDir =>
withSQLConf(
"spark.sql.catalog.test_cat" ->
"org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.test_cat.type" -> "hadoop",
"spark.sql.catalog.test_cat.warehouse" -> warehouseDir.getAbsolutePath,
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") {
import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator}
import org.apache.spark.sql.functions.monotonically_increasing_id
import org.apache.spark.sql.types._
val dataPath = s"${warehouseDir.getAbsolutePath}/int96_data"
val numRows = 50
val r = new scala.util.Random(42)
// Schema for FuzzDataGenerator - just timestamp and value columns
val fuzzSchema = StructType(Seq(
StructField("outputTimestamp", TimestampType, nullable = true),
StructField("value", DoubleType, nullable = true)))
// Use FuzzDataGenerator with default options (year 3333 baseDate for
INT96)
val dataGenOptions = DataGenOptions(allowNull = false)
val fuzzDf = FuzzDataGenerator.generateDataFrame(r, spark, fuzzSchema,
numRows, dataGenOptions)
// Add unique id and geohash columns
val df = fuzzDf
.withColumn("id", monotonically_increasing_id())
.selectExpr(
"id",
"outputTimestamp",
"concat(substring('0123456789bcdefghjkmnpqrstuvwxyz', 1 + int(id %
32), 1), " +
"substring('0123456789bcdefghjkmnpqrstuvwxyz', 1 + int((id / 32)
% 32), 1), " +
"substring('0123456789bcdefghjkmnpqrstuvwxyz', 1 + int((id /
1024) % 32), 1)) as geohash3",
"value")
// Write Parquet with INT96 timestamps
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "INT96") {
df.write.mode("overwrite").parquet(dataPath)
}
// Verify the Parquet files actually contain INT96 timestamps
val parquetFiles = new java.io.File(dataPath).listFiles()
.filter(f => f.getName.endsWith(".parquet"))
assert(parquetFiles.nonEmpty, "Expected at least one Parquet file")
val parquetFile = parquetFiles.head
val reader = org.apache.parquet.hadoop.ParquetFileReader.open(
org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(
new org.apache.hadoop.fs.Path(parquetFile.getAbsolutePath),
spark.sessionState.newHadoopConf()))
try {
val parquetSchema = reader.getFooter.getFileMetaData.getSchema
val timestampColumn = parquetSchema.getColumns.asScala
.find(_.getPath.mkString(".") == "outputTimestamp")
assert(timestampColumn.isDefined, "Expected outputTimestamp column
in Parquet schema")
assert(
timestampColumn.get.getPrimitiveType.getPrimitiveTypeName ==
org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT96,
s"Expected INT96 type for outputTimestamp but got
${timestampColumn.get.getPrimitiveType.getPrimitiveTypeName}")
} finally {
reader.close()
}
// Create Iceberg table with hour(timestamp) + truncate(geohash, 3)
partitioning
spark.sql("CREATE NAMESPACE IF NOT EXISTS test_cat.db")
spark.sql(s"""
CREATE TABLE test_cat.db.int96_hour_test (
id BIGINT,
outputTimestamp TIMESTAMP,
geohash3 STRING,
value DOUBLE
) USING iceberg
PARTITIONED BY (hours(outputTimestamp), truncate(geohash3, 3))
""")
// Use SparkTableUtil.importSparkTable to import the Parquet files
try {
val tableUtilClass =
Class.forName("org.apache.iceberg.spark.SparkTableUtil")
val sparkCatalog = spark.sessionState.catalogManager
.catalog("test_cat")
.asInstanceOf[org.apache.iceberg.spark.SparkCatalog]
val ident =
org.apache.spark.sql.connector.catalog.Identifier.of(Array("db"),
"int96_hour_test")
val sparkTable = sparkCatalog.loadTable(ident)
.asInstanceOf[org.apache.iceberg.spark.source.SparkTable]
val table = sparkTable.table()
val stagingDir = s"${warehouseDir.getAbsolutePath}/staging"
// Create a temp table pointing to the parquet path
spark.sql(s"""CREATE TABLE parquet_temp USING parquet LOCATION
'$dataPath'""")
val sourceIdent = new
org.apache.spark.sql.catalyst.TableIdentifier("parquet_temp")
val importMethod = tableUtilClass.getMethod(
"importSparkTable",
classOf[org.apache.spark.sql.SparkSession],
classOf[org.apache.spark.sql.catalyst.TableIdentifier],
classOf[org.apache.iceberg.Table],
classOf[String])
importMethod.invoke(null, spark, sourceIdent, table, stagingDir)
// Query the table and verify no duplicates
val distinctCount = spark
.sql("SELECT COUNT(DISTINCT id) FROM test_cat.db.int96_hour_test")
.collect()(0)
.getLong(0)
assert(distinctCount == numRows, s"Expected $numRows distinct IDs
but got $distinctCount")
checkIcebergNativeScan("SELECT * FROM test_cat.db.int96_hour_test
ORDER BY id")
checkIcebergNativeScan(
"SELECT id, outputTimestamp FROM test_cat.db.int96_hour_test WHERE
id < 50 ORDER BY id")
spark.sql("DROP TABLE test_cat.db.int96_hour_test")
spark.sql("DROP TABLE parquet_temp")
} catch {
case _: ClassNotFoundException =>
cancel("SparkTableUtil not available")
}
}
}
}
```
### Expected behavior
We should get the same results back as reading through Iceberg Java.
### Additional context
## Fix Location
The fix needs to be in **iceberg-rust**, not Comet:
- Modify `reader.rs` to overlay Iceberg schema types onto the
Parquet-derived schema before passing to `ArrowReaderOptions`
- For timestamp columns, ensure `Timestamp(Microsecond, ...)` is passed,
triggering arrow-rs's INT96 conversion logic from PR #7285
## Files to Modify
1. `/Users/matt/git/iceberg-rust/crates/iceberg/src/arrow/reader.rs`
- After building `arrow_schema` from Parquet, overlay Iceberg schema's
timestamp types
- Specifically, for columns where Iceberg type is Timestamp/Timestamptz,
ensure Arrow schema has `Timestamp(Microsecond, ...)`
-
## Related PRs
- arrow-rs #7285: Support different TimeUnits and timezones when reading
Timestamps from INT96
- datafusion #15537: INT96 handling in DataFusion
- comet #1652: INT96 handling in Comet
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]