mbutrovich opened a new issue, #3860:
URL: https://github.com/apache/datafusion-comet/issues/3860
### Describe the bug
iceberg-rust's `PredicateConverter` fails with "Leave column `id` in
predicates isn't a root column in Parquet schema" when all three conditions are
met:
1. **Migrated table** — Parquet files were written by Spark without Iceberg
field IDs, then imported via `SparkTableUtil.importSparkTable()`
2. **Nested types** in the schema (struct, array, or map)
3. **Filter predicate on a root column that appears after nested types** in
the column ordering
### Root Cause
The `column_map` in iceberg-rust's `PredicateConverter`
(`crates/iceberg/src/arrow/reader.rs:1609`) maps Iceberg field IDs to Parquet
leaf column indices. For migrated files (no embedded Iceberg field IDs),
iceberg-rust falls back to name-based mapping. This mapping produces incorrect
leaf indices when nested types are present, causing a flat column like `id` to
be mapped to a leaf index inside a group (struct/map/array).
The check at `reader.rs:1622`:
```rust
if self.parquet_schema.get_column_root(*column_idx).is_group() {
return Err(...)
}
```
then fails because `get_column_root()` returns the enclosing group rather
than `id` itself.
Note: Tables created directly through Iceberg (which embed field IDs in
Parquet metadata) are NOT affected. The bug is specific to the name-mapping
fallback path used for migrated files.
### Steps to reproduce
```scala
test("filter with nested types in migrated table") {
assume(icebergAvailable, "Iceberg not available in classpath")
withTempIcebergDir { warehouseDir =>
withSQLConf(
"spark.sql.catalog.test_cat" ->
"org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.test_cat.type" -> "hadoop",
"spark.sql.catalog.test_cat.warehouse" -> warehouseDir.getAbsolutePath,
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") {
val dataPath = s"${warehouseDir.getAbsolutePath}/nested_data"
// Write Parquet WITHOUT Iceberg (simulates pre-migration data)
// id is last so its leaf index is after all nested type leaves
spark.sql(s"""
SELECT
named_struct('age', id * 10, 'score', id * 1.5) AS info,
array(id, id + 1) AS tags,
map('key', id) AS props,
id
FROM range(10)
""").write.parquet(dataPath)
spark.sql("CREATE NAMESPACE IF NOT EXISTS test_cat.db")
spark.sql(s"""
CREATE TABLE test_cat.db.nested_migrate (
info STRUCT<age: BIGINT, score: DOUBLE>,
tags ARRAY<BIGINT>,
props MAP<STRING, BIGINT>,
id BIGINT
) USING iceberg
""")
try {
val tableUtilClass =
Class.forName("org.apache.iceberg.spark.SparkTableUtil")
val sparkCatalog = spark.sessionState.catalogManager
.catalog("test_cat")
.asInstanceOf[org.apache.iceberg.spark.SparkCatalog]
val ident =
org.apache.spark.sql.connector.catalog.Identifier.of(Array("db"),
"nested_migrate")
val sparkTable = sparkCatalog
.loadTable(ident)
.asInstanceOf[org.apache.iceberg.spark.source.SparkTable]
val table = sparkTable.table()
val stagingDir = s"${warehouseDir.getAbsolutePath}/staging"
spark.sql(s"""CREATE TABLE parquet_temp USING parquet LOCATION
'$dataPath'""")
val sourceIdent = new
org.apache.spark.sql.catalyst.TableIdentifier("parquet_temp")
val importMethod = tableUtilClass.getMethod(
"importSparkTable",
classOf[org.apache.spark.sql.SparkSession],
classOf[org.apache.spark.sql.catalyst.TableIdentifier],
classOf[org.apache.iceberg.Table],
classOf[String])
importMethod.invoke(null, spark, sourceIdent, table, stagingDir)
// Select only flat columns to avoid Spark's Iceberg reader returning
// null for struct fields in migrated tables (separate Spark bug)
checkIcebergNativeScan(
"SELECT id FROM test_cat.db.nested_migrate ORDER BY id")
// Filter on root column with nested types in migrated table:
// Parquet files lack Iceberg field IDs, so iceberg-rust falls back
to
// name mapping where column_map resolution is broken for nested
types
checkIcebergNativeScan(
"SELECT id FROM test_cat.db.nested_migrate WHERE id > 5 ORDER BY
id")
spark.sql("DROP TABLE test_cat.db.nested_migrate")
spark.sql("DROP TABLE parquet_temp")
} catch {
case _: ClassNotFoundException =>
cancel("SparkTableUtil not available")
}
}
}
}
```
### Expected behavior
Queries with filter predicates should work on migrated tables regardless of
whether the schema contains nested types.
#### Additional context
- The residual predicate is serialized by Comet from the Iceberg
`FileScanTask.residual()`` and sent to iceberg-rust for row-group pruning.
- Without the predicate, the scan works fine — the filter is still applied
post-scan by `CometFilter`.
- Column ordering matters: if the filtered column appears before all nested
types in the schema, the bug does not trigger (the leaf index happens to be
correct).
#### Possible Comet workaround
Skip serializing residual predicates when the table schema contains nested
types. This is safe since the filter is applied post-scan anyway, with loss of
row-group pruning efficiency.
#### Related
- iceberg-rust `PredicateConverter`:
[`crates/iceberg/src/arrow/reader.rs:1604-1649`](https://github.com/apache/iceberg-rust/blob/main/crates/iceberg/src/arrow/reader.rs#L1604-L1649)
- iceberg-rust
`project_column`:[`crates/iceberg/src/arrow/reader.rs:1666-1679`](https://github.com/apache/iceberg-rust/blob/main/crates/iceberg/src/arrow/reader.rs#L1666-L1679)
(also notes "Only supports top-level columns for now")
- Original discovery: while testing INT96 fix in [iceberg-rust
#2301](https://github.com/apache/iceberg-rust/pull/2301) via [comet
#3857](https://github.com/apache/datafusion-comet/pull/3857)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]