This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new ef851524f feat(arrow): Convert Arrow schema to Iceberg schema with
auto assigned field ids (#1928)
ef851524f is described below
commit ef851524f16a604c05683051d732fa523b6e3bdc
Author: Shawn Chang <[email protected]>
AuthorDate: Mon Dec 29 17:24:56 2025 -0800
feat(arrow): Convert Arrow schema to Iceberg schema with auto assigned
field ids (#1928)
## Which issue does this PR close?
- Closes #1927
## What changes are included in this PR?
- Modified ArrowSchemaConverter to enable id reassignment
- Added a new pub helper: `arrow_schema_to_schema_auto_assign_ids`
## Are these changes tested?
Added uts
---
crates/iceberg/src/arrow/schema.rs | 233 ++++++++++++++++++++--
crates/iceberg/src/arrow/value.rs | 4 +-
crates/iceberg/src/spec/mod.rs | 1 +
crates/iceberg/src/spec/schema/mod.rs | 4 +-
crates/iceberg/src/spec/table_metadata_builder.rs | 2 +-
5 files changed, 226 insertions(+), 18 deletions(-)
diff --git a/crates/iceberg/src/arrow/schema.rs
b/crates/iceberg/src/arrow/schema.rs
index 4f4f083c7..9ee7897cb 100644
--- a/crates/iceberg/src/arrow/schema.rs
+++ b/crates/iceberg/src/arrow/schema.rs
@@ -35,8 +35,8 @@ use uuid::Uuid;
use crate::error::Result;
use crate::spec::{
- Datum, ListType, MapType, NestedField, NestedFieldRef, PrimitiveLiteral,
PrimitiveType, Schema,
- SchemaVisitor, StructType, Type,
+ Datum, FIRST_FIELD_ID, ListType, MapType, NestedField, NestedFieldRef,
PrimitiveLiteral,
+ PrimitiveType, Schema, SchemaVisitor, StructType, Type,
};
use crate::{Error, ErrorKind};
@@ -221,6 +221,19 @@ pub fn arrow_schema_to_schema(schema: &ArrowSchema) ->
Result<Schema> {
visit_schema(schema, &mut visitor)
}
+/// Convert Arrow schema to Iceberg schema with automatically assigned field
IDs.
+///
+/// Unlike [`arrow_schema_to_schema`], this function does not require field
IDs in the Arrow
+/// schema metadata. Instead, it automatically assigns unique field IDs
starting from 1,
+/// following Iceberg's field ID assignment rules.
+///
+/// This is useful when converting Arrow schemas that don't originate from
Iceberg tables,
+/// such as schemas from DataFusion or other Arrow-based systems.
+pub fn arrow_schema_to_schema_auto_assign_ids(schema: &ArrowSchema) ->
Result<Schema> {
+ let mut visitor =
ArrowSchemaConverter::new_with_field_ids_from(FIRST_FIELD_ID);
+ visit_schema(schema, &mut visitor)
+}
+
/// Convert Arrow type to iceberg type.
pub fn arrow_type_to_type(ty: &DataType) -> Result<Type> {
let mut visitor = ArrowSchemaConverter::new();
@@ -229,7 +242,7 @@ pub fn arrow_type_to_type(ty: &DataType) -> Result<Type> {
const ARROW_FIELD_DOC_KEY: &str = "doc";
-pub(super) fn get_field_id(field: &Field) -> Result<i32> {
+pub(super) fn get_field_id_from_metadata(field: &Field) -> Result<i32> {
if let Some(value) = field.metadata().get(PARQUET_FIELD_ID_META_KEY) {
return value.parse::<i32>().map_err(|e| {
Error::new(
@@ -253,19 +266,55 @@ fn get_field_doc(field: &Field) -> Option<String> {
None
}
-struct ArrowSchemaConverter;
+struct ArrowSchemaConverter {
+ /// When set, the schema builder will reassign field IDs starting from
this value
+ /// using level-order traversal (breadth-first).
+ reassign_field_ids_from: Option<i32>,
+ /// Generates unique placeholder IDs for fields before reassignment.
+ /// Required because `ReassignFieldIds` builds an old-to-new ID mapping
+ /// that expects unique input IDs.
+ next_field_id: i32,
+}
impl ArrowSchemaConverter {
fn new() -> Self {
- Self {}
+ Self {
+ reassign_field_ids_from: None,
+ next_field_id: 0,
+ }
+ }
+
+ fn new_with_field_ids_from(start_from: i32) -> Self {
+ Self {
+ reassign_field_ids_from: Some(start_from),
+ next_field_id: 0,
+ }
+ }
+
+ fn get_field_id(&mut self, field: &Field) -> Result<i32> {
+ if self.reassign_field_ids_from.is_some() {
+ // Field IDs will be reassigned by the schema builder.
+ // We need unique temporary IDs because ReassignFieldIds builds an
+ // old->new ID mapping that requires unique input IDs.
+ let temp_id = self.next_field_id;
+ self.next_field_id += 1;
+ Ok(temp_id)
+ } else {
+ // Get field ID from arrow field metadata
+ get_field_id_from_metadata(field)
+ }
}
- fn convert_fields(fields: &Fields, field_results: &[Type]) ->
Result<Vec<NestedFieldRef>> {
+ fn convert_fields(
+ &mut self,
+ fields: &Fields,
+ field_results: &[Type],
+ ) -> Result<Vec<NestedFieldRef>> {
let mut results = Vec::with_capacity(fields.len());
for i in 0..fields.len() {
let field = &fields[i];
let field_type = &field_results[i];
- let id = get_field_id(field)?;
+ let id = self.get_field_id(field)?;
let doc = get_field_doc(field);
let nested_field = NestedField {
id,
@@ -287,13 +336,16 @@ impl ArrowSchemaVisitor for ArrowSchemaConverter {
type U = Schema;
fn schema(&mut self, schema: &ArrowSchema, values: Vec<Self::T>) ->
Result<Self::U> {
- let fields = Self::convert_fields(schema.fields(), &values)?;
- let builder = Schema::builder().with_fields(fields);
+ let fields = self.convert_fields(schema.fields(), &values)?;
+ let mut builder = Schema::builder().with_fields(fields);
+ if let Some(start_from) = self.reassign_field_ids_from {
+ builder = builder.with_reassigned_field_ids(start_from)
+ }
builder.build()
}
fn r#struct(&mut self, fields: &Fields, results: Vec<Self::T>) ->
Result<Self::T> {
- let fields = Self::convert_fields(fields, &results)?;
+ let fields = self.convert_fields(fields, &results)?;
Ok(Type::Struct(StructType::new(fields)))
}
@@ -310,7 +362,7 @@ impl ArrowSchemaVisitor for ArrowSchemaConverter {
}
};
- let id = get_field_id(element_field)?;
+ let id = self.get_field_id(element_field)?;
let doc = get_field_doc(element_field);
let mut element_field =
NestedField::list_element(id, value.clone(),
!element_field.is_nullable());
@@ -335,7 +387,7 @@ impl ArrowSchemaVisitor for ArrowSchemaConverter {
let key_field = &fields[0];
let value_field = &fields[1];
- let key_id = get_field_id(key_field)?;
+ let key_id = self.get_field_id(key_field)?;
let key_doc = get_field_doc(key_field);
let mut key_field = NestedField::map_key_element(key_id,
key_value.clone());
if let Some(doc) = key_doc {
@@ -343,7 +395,7 @@ impl ArrowSchemaVisitor for ArrowSchemaConverter {
}
let key_field = Arc::new(key_field);
- let value_id = get_field_id(value_field)?;
+ let value_id = self.get_field_id(value_field)?;
let value_doc = get_field_doc(value_field);
let mut value_field = NestedField::map_value_element(
value_id,
@@ -1932,4 +1984,159 @@ mod tests {
assert_eq!(array.value(0), [66u8; 16]);
}
}
+
+ #[test]
+ fn test_arrow_schema_to_schema_with_field_id() {
+ // Create a complex Arrow schema without field ID metadata
+ // Including: primitives, list, nested struct, map, and nested list of
structs
+ let arrow_schema = ArrowSchema::new(vec![
+ Field::new("id", DataType::Int64, false),
+ Field::new("name", DataType::Utf8, true),
+ Field::new("price", DataType::Decimal128(10, 2), false),
+ Field::new(
+ "created_at",
+ DataType::Timestamp(TimeUnit::Microsecond,
Some("+00:00".into())),
+ true,
+ ),
+ Field::new(
+ "tags",
+ DataType::List(Arc::new(Field::new("item", DataType::Utf8,
true))),
+ true,
+ ),
+ Field::new(
+ "address",
+ DataType::Struct(Fields::from(vec![
+ Field::new("street", DataType::Utf8, true),
+ Field::new("city", DataType::Utf8, false),
+ Field::new("zip", DataType::Int32, true),
+ ])),
+ true,
+ ),
+ Field::new(
+ "attributes",
+ DataType::Map(
+ Arc::new(Field::new(
+ DEFAULT_MAP_FIELD_NAME,
+ DataType::Struct(Fields::from(vec![
+ Field::new("key", DataType::Utf8, false),
+ Field::new("value", DataType::Utf8, true),
+ ])),
+ false,
+ )),
+ false,
+ ),
+ true,
+ ),
+ Field::new(
+ "orders",
+ DataType::List(Arc::new(Field::new(
+ "element",
+ DataType::Struct(Fields::from(vec![
+ Field::new("order_id", DataType::Int64, false),
+ Field::new("amount", DataType::Float64, false),
+ ])),
+ true,
+ ))),
+ true,
+ ),
+ ]);
+
+ let schema =
arrow_schema_to_schema_auto_assign_ids(&arrow_schema).unwrap();
+
+ // Build expected schema with exact field IDs following level-order
assignment:
+ // Level 0: id=1, name=2, price=3, created_at=4, tags=5, address=6,
attributes=7, orders=8
+ // Level 1: tags.element=9, address.{street=10,city=11,zip=12},
attributes.{key=13,value=14}, orders.element=15
+ // Level 2: orders.element.{order_id=16,amount=17}
+ let expected = Schema::builder()
+ .with_fields(vec![
+ NestedField::required(1, "id",
Type::Primitive(PrimitiveType::Long)).into(),
+ NestedField::optional(2, "name",
Type::Primitive(PrimitiveType::String)).into(),
+ NestedField::required(
+ 3,
+ "price",
+ Type::Primitive(PrimitiveType::Decimal {
+ precision: 10,
+ scale: 2,
+ }),
+ )
+ .into(),
+ NestedField::optional(4, "created_at",
Type::Primitive(PrimitiveType::Timestamptz))
+ .into(),
+ NestedField::optional(
+ 5,
+ "tags",
+ Type::List(ListType {
+ element_field: NestedField::list_element(
+ 9,
+ Type::Primitive(PrimitiveType::String),
+ false,
+ )
+ .into(),
+ }),
+ )
+ .into(),
+ NestedField::optional(
+ 6,
+ "address",
+ Type::Struct(StructType::new(vec![
+ NestedField::optional(10, "street",
Type::Primitive(PrimitiveType::String))
+ .into(),
+ NestedField::required(11, "city",
Type::Primitive(PrimitiveType::String))
+ .into(),
+ NestedField::optional(12, "zip",
Type::Primitive(PrimitiveType::Int))
+ .into(),
+ ])),
+ )
+ .into(),
+ NestedField::optional(
+ 7,
+ "attributes",
+ Type::Map(MapType {
+ key_field: NestedField::map_key_element(
+ 13,
+ Type::Primitive(PrimitiveType::String),
+ )
+ .into(),
+ value_field: NestedField::map_value_element(
+ 14,
+ Type::Primitive(PrimitiveType::String),
+ false,
+ )
+ .into(),
+ }),
+ )
+ .into(),
+ NestedField::optional(
+ 8,
+ "orders",
+ Type::List(ListType {
+ element_field: NestedField::list_element(
+ 15,
+ Type::Struct(StructType::new(vec![
+ NestedField::required(
+ 16,
+ "order_id",
+ Type::Primitive(PrimitiveType::Long),
+ )
+ .into(),
+ NestedField::required(
+ 17,
+ "amount",
+ Type::Primitive(PrimitiveType::Double),
+ )
+ .into(),
+ ])),
+ false,
+ )
+ .into(),
+ }),
+ )
+ .into(),
+ ])
+ .build()
+ .unwrap();
+
+ pretty_assertions::assert_eq!(schema, expected);
+ assert_eq!(schema.highest_field_id(), 17);
+ }
}
diff --git a/crates/iceberg/src/arrow/value.rs
b/crates/iceberg/src/arrow/value.rs
index 190aba08e..30b47d83f 100644
--- a/crates/iceberg/src/arrow/value.rs
+++ b/crates/iceberg/src/arrow/value.rs
@@ -27,7 +27,7 @@ use arrow_buffer::NullBuffer;
use arrow_schema::{DataType, FieldRef};
use uuid::Uuid;
-use super::get_field_id;
+use super::get_field_id_from_metadata;
use crate::spec::{
ListType, Literal, Map, MapType, NestedField, PartnerAccessor,
PrimitiveLiteral, PrimitiveType,
SchemaWithPartnerVisitor, Struct, StructType, Type,
visit_struct_with_partner,
@@ -450,7 +450,7 @@ impl FieldMatchMode {
/// Determines if an Arrow field matches an Iceberg field based on the
matching mode.
pub fn match_field(&self, arrow_field: &FieldRef, iceberg_field:
&NestedField) -> bool {
match self {
- FieldMatchMode::Id => get_field_id(arrow_field)
+ FieldMatchMode::Id => get_field_id_from_metadata(arrow_field)
.map(|id| id == iceberg_field.id)
.unwrap_or(false),
FieldMatchMode::Name => arrow_field.name() == &iceberg_field.name,
diff --git a/crates/iceberg/src/spec/mod.rs b/crates/iceberg/src/spec/mod.rs
index 44b35e5a6..a2b540f08 100644
--- a/crates/iceberg/src/spec/mod.rs
+++ b/crates/iceberg/src/spec/mod.rs
@@ -49,6 +49,7 @@ pub use snapshot_summary::*;
pub use sort::*;
pub use statistic_file::*;
pub use table_metadata::*;
+pub(crate) use table_metadata_builder::FIRST_FIELD_ID;
pub use table_properties::*;
pub use transform::*;
pub use values::*;
diff --git a/crates/iceberg/src/spec/schema/mod.rs
b/crates/iceberg/src/spec/schema/mod.rs
index 7080b6e70..13ad41818 100644
--- a/crates/iceberg/src/spec/schema/mod.rs
+++ b/crates/iceberg/src/spec/schema/mod.rs
@@ -102,8 +102,8 @@ impl SchemaBuilder {
/// Reassignment starts from the field-id specified in `start_from`
(inclusive).
///
/// All specified aliases and identifier fields will be updated to the new
field-ids.
- pub(crate) fn with_reassigned_field_ids(mut self, start_from: u32) -> Self
{
- self.reassign_field_ids_from =
Some(start_from.try_into().unwrap_or(i32::MAX));
+ pub(crate) fn with_reassigned_field_ids(mut self, start_from: i32) -> Self
{
+ self.reassign_field_ids_from = Some(start_from);
self
}
diff --git a/crates/iceberg/src/spec/table_metadata_builder.rs
b/crates/iceberg/src/spec/table_metadata_builder.rs
index eee4fec34..3db327d48 100644
--- a/crates/iceberg/src/spec/table_metadata_builder.rs
+++ b/crates/iceberg/src/spec/table_metadata_builder.rs
@@ -31,7 +31,7 @@ use crate::error::{Error, ErrorKind, Result};
use crate::spec::{EncryptedKey, INITIAL_ROW_ID,
MIN_FORMAT_VERSION_ROW_LINEAGE};
use crate::{TableCreation, TableUpdate};
-const FIRST_FIELD_ID: u32 = 1;
+pub(crate) const FIRST_FIELD_ID: i32 = 1;
/// Manipulating table metadata.
///