emkornfield commented on code in PR #2301:
URL: https://github.com/apache/iceberg-rust/pull/2301#discussion_r3017865648
##########
crates/iceberg/src/arrow/reader.rs:
##########
@@ -1232,6 +1257,202 @@ fn add_fallback_field_ids_to_arrow_schema(arrow_schema:
&ArrowSchemaRef) -> Arc<
))
}
+/// Coerce Arrow schema types for INT96 columns to match the Iceberg table
schema.
+///
+/// arrow-rs defaults INT96 to `Timestamp(Nanosecond)`, which overflows i64
for dates outside
+/// ~1677-2262. We use arrow-rs's schema hint mechanism to read INT96 at the
resolution
+/// specified by the Iceberg schema (`timestamp` → microsecond, `timestamp_ns`
→ nanosecond).
+///
+/// Iceberg Java handles this differently: it bypasses parquet-mr with a
custom column reader
+/// (`GenericParquetReaders.TimestampInt96Reader`). We achieve the same result
via schema hints.
+///
+/// References:
+/// - Iceberg spec primitive types:
<https://iceberg.apache.org/spec/#primitive-types>
+/// - arrow-rs schema hint support:
<https://github.com/apache/arrow-rs/pull/7285>
+fn coerce_int96_timestamps(
+ parquet_schema: &SchemaDescriptor,
+ arrow_schema: &ArrowSchemaRef,
+ iceberg_schema: &Schema,
+) -> Option<Arc<ArrowSchema>> {
+ let int96_paths: Vec<ColumnPath> = parquet_schema
+ .columns()
+ .iter()
+ .filter(|col| col.physical_type() == PhysicalType::INT96)
+ .map(|col| col.path().clone())
+ .collect();
+
+ if int96_paths.is_empty() {
+ return None;
+ }
+
+ let mut fields: Vec<FieldRef> =
arrow_schema.fields().iter().cloned().collect();
+ let mut any_changed = false;
+
+ for path in &int96_paths {
+ let parts = path.parts();
+ if let Some(idx) = fields.iter().position(|f| f.name() == &parts[0]) {
+ let (new_field, changed) = coerce_field_at_path(&fields[idx],
parts, 0, iceberg_schema);
+ if changed {
+ fields[idx] = new_field;
+ any_changed = true;
+ }
+ }
+ }
+
+ if any_changed {
+ Some(Arc::new(ArrowSchema::new_with_metadata(
+ fields,
+ arrow_schema.metadata().clone(),
+ )))
+ } else {
+ None
+ }
+}
+
+/// Navigate an Arrow field tree using a Parquet column path and coerce the
leaf
+/// INT96 `Timestamp(Nanosecond)` to the resolution specified by the Iceberg
schema.
+///
+/// Parquet column paths include intermediate group names for nested types
(e.g. the
+/// repeated group in LIST encoding). This function accounts for those extra
levels
+/// when descending through List, LargeList, and Map Arrow types.
+fn coerce_field_at_path(
+ field: &FieldRef,
+ parquet_path: &[String],
+ depth: usize,
+ iceberg_schema: &Schema,
+) -> (FieldRef, bool) {
+ if depth == parquet_path.len() - 1 {
+ return coerce_timestamp_field(field, iceberg_schema);
+ }
+
+ match field.data_type() {
+ DataType::Struct(fields) => {
+ let child_name = &parquet_path[depth + 1];
+ let mut new_fields: Vec<FieldRef> =
fields.iter().cloned().collect();
+ let mut changed = false;
+
+ if let Some(idx) = new_fields.iter().position(|f| f.name() ==
child_name) {
+ let (new_child, child_changed) =
+ coerce_field_at_path(&new_fields[idx], parquet_path, depth
+ 1, iceberg_schema);
+ if child_changed {
+ new_fields[idx] = new_child;
+ changed = true;
+ }
+ }
+
+ if changed {
+ let new_field = Arc::new(
+ Field::new(
+ field.name(),
+ DataType::Struct(Fields::from(new_fields)),
+ field.is_nullable(),
+ )
+ .with_metadata(field.metadata().clone()),
+ );
+ (new_field, true)
+ } else {
+ (Arc::clone(field), false)
+ }
+ }
+ DataType::List(element_field) | DataType::LargeList(element_field) => {
+ // Parquet 3-level LIST encoding inserts a repeated group between
the list
+ // and its element, e.g. `my_list.list.element` where `list` is the
+ // intermediate group. The group name varies across writers — the
spec says
+ // "list" but legacy data uses "element", "array",
`<parent>_tuple`, etc.
+ // We skip it (depth + 1) since we navigate by Parquet ColumnPath
parts
+ // which contain the actual name from the file. See the Parquet
LogicalTypes
+ // spec for the backward-compatibility rules:
+ //
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
+ if depth + 2 < parquet_path.len() {
+ debug_assert_eq!(
Review Comment:
This is a can of worms I think, I don't have any great suggestions for
making this robust, but I can guarantee there are likely issues for some set of
parquet files.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]