mbutrovich commented on code in PR #2301:
URL: https://github.com/apache/iceberg-rust/pull/2301#discussion_r3017622825
##########
crates/iceberg/src/arrow/reader.rs:
##########
@@ -1232,6 +1253,121 @@ fn add_fallback_field_ids_to_arrow_schema(arrow_schema:
&ArrowSchemaRef) -> Arc<
))
}
+/// Coerce Arrow schema types for INT96 columns to match the Iceberg table
schema.
+///
+/// arrow-rs defaults INT96 to `Timestamp(Nanosecond)`, which overflows i64
for dates outside
+/// ~1677-2262. We use arrow-rs's schema hint mechanism to read INT96 at the
resolution
+/// specified by the Iceberg schema (`timestamp` → microsecond, `timestamp_ns`
→ nanosecond).
+///
+/// Iceberg Java handles this differently: it bypasses parquet-mr with a
custom column reader
+/// (`GenericParquetReaders.TimestampInt96Reader`). We achieve the same result
via schema hints.
+///
+/// References:
+/// - Iceberg spec primitive types:
<https://iceberg.apache.org/spec/#primitive-types>
+/// - arrow-rs schema hint support:
<https://github.com/apache/arrow-rs/pull/7285>
+fn coerce_int96_timestamps(
+ parquet_schema: &SchemaDescriptor,
+ arrow_schema: &ArrowSchemaRef,
+ iceberg_schema: &Schema,
+) -> Option<Arc<ArrowSchema>> {
+ use arrow_schema::{DataType, Field, Fields, TimeUnit};
+ use parquet::basic::Type as PhysicalType;
+
+ let int96_paths: HashSet<String> = parquet_schema
+ .columns()
+ .iter()
+ .filter(|col| col.physical_type() == PhysicalType::INT96)
+ .map(|col| col.path().string())
+ .collect();
+
+ if int96_paths.is_empty() {
+ return None;
+ }
+
+ fn coerce_field(
+ field: &FieldRef,
+ path_parts: &[&str],
+ int96_paths: &HashSet<String>,
+ iceberg_schema: &Schema,
+ ) -> FieldRef {
+ match field.data_type() {
+ DataType::Struct(fields) => {
+ let new_fields: Vec<FieldRef> = fields
+ .iter()
+ .map(|child| {
+ let mut child_path = path_parts.to_vec();
+ child_path.push(child.name().as_str());
+ coerce_field(child, &child_path, int96_paths,
iceberg_schema)
+ })
+ .collect();
+ Arc::new(
+ Field::new(
+ field.name(),
+ DataType::Struct(Fields::from(new_fields)),
+ field.is_nullable(),
+ )
+ .with_metadata(field.metadata().clone()),
+ )
+ }
+ DataType::Timestamp(TimeUnit::Nanosecond, tz)
+ if int96_paths.contains(&path_parts.join(".")) =>
+ {
+ let target_unit = field
+ .metadata()
+ .get(PARQUET_FIELD_ID_META_KEY)
+ .and_then(|id_str| id_str.parse::<i32>().ok())
+ .and_then(|field_id| iceberg_schema.field_by_id(field_id))
+ .and_then(|f| match &*f.field_type {
+ Type::Primitive(PrimitiveType::Timestamp |
PrimitiveType::Timestamptz) => {
+ Some(TimeUnit::Microsecond)
+ }
+ Type::Primitive(
+ PrimitiveType::TimestampNs |
PrimitiveType::TimestamptzNs,
+ ) => Some(TimeUnit::Nanosecond),
+ _ => None,
+ })
+ // Iceberg Java reads INT96 as microseconds by default
+ .unwrap_or(TimeUnit::Microsecond);
+
+ if target_unit == TimeUnit::Nanosecond {
+ return Arc::clone(field);
+ }
+
+ Arc::new(
+ Field::new(
+ field.name(),
+ DataType::Timestamp(target_unit, tz.clone()),
+ field.is_nullable(),
+ )
+ .with_metadata(field.metadata().clone()),
+ )
+ }
+ _ => Arc::clone(field),
+ }
+ }
+
+ let coerced_fields: Vec<FieldRef> = arrow_schema
+ .fields()
+ .iter()
+ .map(|field| {
Review Comment:
Will do, thanks!
##########
crates/iceberg/src/arrow/reader.rs:
##########
@@ -1232,6 +1253,121 @@ fn add_fallback_field_ids_to_arrow_schema(arrow_schema:
&ArrowSchemaRef) -> Arc<
))
}
+/// Coerce Arrow schema types for INT96 columns to match the Iceberg table
schema.
+///
+/// arrow-rs defaults INT96 to `Timestamp(Nanosecond)`, which overflows i64
for dates outside
+/// ~1677-2262. We use arrow-rs's schema hint mechanism to read INT96 at the
resolution
+/// specified by the Iceberg schema (`timestamp` → microsecond, `timestamp_ns`
→ nanosecond).
+///
+/// Iceberg Java handles this differently: it bypasses parquet-mr with a
custom column reader
+/// (`GenericParquetReaders.TimestampInt96Reader`). We achieve the same result
via schema hints.
+///
+/// References:
+/// - Iceberg spec primitive types:
<https://iceberg.apache.org/spec/#primitive-types>
+/// - arrow-rs schema hint support:
<https://github.com/apache/arrow-rs/pull/7285>
+fn coerce_int96_timestamps(
+ parquet_schema: &SchemaDescriptor,
+ arrow_schema: &ArrowSchemaRef,
+ iceberg_schema: &Schema,
+) -> Option<Arc<ArrowSchema>> {
+ use arrow_schema::{DataType, Field, Fields, TimeUnit};
+ use parquet::basic::Type as PhysicalType;
+
+ let int96_paths: HashSet<String> = parquet_schema
+ .columns()
+ .iter()
+ .filter(|col| col.physical_type() == PhysicalType::INT96)
+ .map(|col| col.path().string())
+ .collect();
+
+ if int96_paths.is_empty() {
+ return None;
+ }
+
+ fn coerce_field(
+ field: &FieldRef,
+ path_parts: &[&str],
+ int96_paths: &HashSet<String>,
+ iceberg_schema: &Schema,
+ ) -> FieldRef {
+ match field.data_type() {
+ DataType::Struct(fields) => {
Review Comment:
Good catch, let me handle those and add respective tests.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]