emkornfield commented on code in PR #2301:
URL: https://github.com/apache/iceberg-rust/pull/2301#discussion_r3017868437


##########
crates/iceberg/src/arrow/reader.rs:
##########
@@ -1232,6 +1257,202 @@ fn add_fallback_field_ids_to_arrow_schema(arrow_schema: 
&ArrowSchemaRef) -> Arc<
     ))
 }
 
+/// Coerce Arrow schema types for INT96 columns to match the Iceberg table 
schema.
+///
+/// arrow-rs defaults INT96 to `Timestamp(Nanosecond)`, which overflows i64 
for dates outside
+/// ~1677-2262. We use arrow-rs's schema hint mechanism to read INT96 at the 
resolution
+/// specified by the Iceberg schema (`timestamp` → microsecond, `timestamp_ns` 
→ nanosecond).
+///
+/// Iceberg Java handles this differently: it bypasses parquet-mr with a 
custom column reader
+/// (`GenericParquetReaders.TimestampInt96Reader`). We achieve the same result 
via schema hints.
+///
+/// References:
+/// - Iceberg spec primitive types: 
<https://iceberg.apache.org/spec/#primitive-types>
+/// - arrow-rs schema hint support: 
<https://github.com/apache/arrow-rs/pull/7285>
+fn coerce_int96_timestamps(
+    parquet_schema: &SchemaDescriptor,
+    arrow_schema: &ArrowSchemaRef,
+    iceberg_schema: &Schema,
+) -> Option<Arc<ArrowSchema>> {
+    let int96_paths: Vec<ColumnPath> = parquet_schema
+        .columns()
+        .iter()
+        .filter(|col| col.physical_type() == PhysicalType::INT96)
+        .map(|col| col.path().clone())
+        .collect();
+
+    if int96_paths.is_empty() {
+        return None;
+    }
+
+    let mut fields: Vec<FieldRef> = 
arrow_schema.fields().iter().cloned().collect();
+    let mut any_changed = false;
+
+    for path in &int96_paths {
+        let parts = path.parts();
+        if let Some(idx) = fields.iter().position(|f| f.name() == &parts[0]) {
+            let (new_field, changed) = coerce_field_at_path(&fields[idx], 
parts, 0, iceberg_schema);
+            if changed {
+                fields[idx] = new_field;
+                any_changed = true;
+            }
+        }
+    }
+
+    if any_changed {
+        Some(Arc::new(ArrowSchema::new_with_metadata(
+            fields,
+            arrow_schema.metadata().clone(),
+        )))
+    } else {
+        None
+    }
+}
+
+/// Navigate an Arrow field tree using a Parquet column path and coerce the 
leaf
+/// INT96 `Timestamp(Nanosecond)` to the resolution specified by the Iceberg 
schema.
+///
+/// Parquet column paths include intermediate group names for nested types 
(e.g. the
+/// repeated group in LIST encoding). This function accounts for those extra 
levels
+/// when descending through List, LargeList, and Map Arrow types.
+fn coerce_field_at_path(
+    field: &FieldRef,
+    parquet_path: &[String],
+    depth: usize,
+    iceberg_schema: &Schema,
+) -> (FieldRef, bool) {
+    if depth == parquet_path.len() - 1 {
+        return coerce_timestamp_field(field, iceberg_schema);
+    }
+
+    match field.data_type() {
+        DataType::Struct(fields) => {
+            let child_name = &parquet_path[depth + 1];
+            let mut new_fields: Vec<FieldRef> = 
fields.iter().cloned().collect();
+            let mut changed = false;
+
+            if let Some(idx) = new_fields.iter().position(|f| f.name() == 
child_name) {
+                let (new_child, child_changed) =
+                    coerce_field_at_path(&new_fields[idx], parquet_path, depth 
+ 1, iceberg_schema);
+                if child_changed {
+                    new_fields[idx] = new_child;
+                    changed = true;
+                }
+            }
+
+            if changed {
+                let new_field = Arc::new(
+                    Field::new(
+                        field.name(),
+                        DataType::Struct(Fields::from(new_fields)),
+                        field.is_nullable(),
+                    )
+                    .with_metadata(field.metadata().clone()),
+                );
+                (new_field, true)
+            } else {
+                (Arc::clone(field), false)
+            }
+        }
+        DataType::List(element_field) | DataType::LargeList(element_field) => {
+            // Parquet 3-level LIST encoding inserts a repeated group between 
the list
+            // and its element, e.g. `my_list.list.element` where `list` is the
+            // intermediate group. The group name varies across writers — the 
spec says
+            // "list" but legacy data uses "element", "array", 
`<parent>_tuple`, etc.
+            // We skip it (depth + 1) since we navigate by Parquet ColumnPath 
parts
+            // which contain the actual name from the file. See the Parquet 
LogicalTypes
+            // spec for the backward-compatibility rules:
+            // 
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
+            if depth + 2 < parquet_path.len() {
+                debug_assert_eq!(
+                    element_field.name(),
+                    &parquet_path[depth + 2],
+                    "Arrow list element name '{}' does not match Parquet path 
segment '{}'",
+                    element_field.name(),
+                    &parquet_path[depth + 2]
+                );
+                let (new_element, changed) =
+                    coerce_field_at_path(element_field, parquet_path, depth + 
2, iceberg_schema);
+                if changed {
+                    let new_type = match field.data_type() {
+                        DataType::List(_) => DataType::List(new_element),
+                        DataType::LargeList(_) => 
DataType::LargeList(new_element),
+                        _ => unreachable!(),
+                    };
+                    let new_field = Arc::new(
+                        Field::new(field.name(), new_type, field.is_nullable())
+                            .with_metadata(field.metadata().clone()),
+                    );
+                    return (new_field, true);
+                }
+            }

Review Comment:
   we should consider logging on the else branch to indicate it was not of 
expected type and therefore not fixed up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to