This is an automated email from the ASF dual-hosted git repository.

honahx pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 70972d9  Apply Name mapping (#219)
70972d9 is described below

commit 70972d9436f24fd899a7a2c8600c96bb4d282e12
Author: Sung Yun <[email protected]>
AuthorDate: Thu Jan 18 20:30:01 2024 -0500

    Apply Name mapping (#219)
---
 pyiceberg/io/pyarrow.py          | 229 ++++++++++++++++++++---------
 pyiceberg/table/__init__.py      |  13 ++
 pyiceberg/table/name_mapping.py  |   2 +
 tests/io/test_pyarrow_visitor.py | 305 +++++++++++++++++++++++++++++++++++++++
 4 files changed, 484 insertions(+), 65 deletions(-)

diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py
index b4988c6..035f5e8 100644
--- a/pyiceberg/io/pyarrow.py
+++ b/pyiceberg/io/pyarrow.py
@@ -124,6 +124,7 @@ from pyiceberg.schema import (
     visit_with_partner,
 )
 from pyiceberg.table import WriteTask
+from pyiceberg.table.name_mapping import NameMapping
 from pyiceberg.transforms import TruncateTransform
 from pyiceberg.typedef import EMPTY_DICT, Properties, Record
 from pyiceberg.types import (
@@ -164,6 +165,9 @@ ICEBERG_SCHEMA = b"iceberg.schema"
 # The PARQUET: in front means that it is Parquet specific, in this case the 
field_id
 PYARROW_PARQUET_FIELD_ID_KEY = b"PARQUET:field_id"
 PYARROW_FIELD_DOC_KEY = b"doc"
+LIST_ELEMENT_NAME = "element"
+MAP_KEY_NAME = "key"
+MAP_VALUE_NAME = "value"
 
 T = TypeVar("T")
 
@@ -631,8 +635,16 @@ def _combine_positional_deletes(positional_deletes: 
List[pa.ChunkedArray], rows:
     return np.setdiff1d(np.arange(rows), all_chunks, assume_unique=False)
 
 
-def pyarrow_to_schema(schema: pa.Schema) -> Schema:
-    visitor = _ConvertToIceberg()
+def pyarrow_to_schema(schema: pa.Schema, name_mapping: Optional[NameMapping] = 
None) -> Schema:
+    has_ids = visit_pyarrow(schema, _HasIds())
+    if has_ids:
+        visitor = _ConvertToIceberg()
+    elif name_mapping is not None:
+        visitor = _ConvertToIceberg(name_mapping=name_mapping)
+    else:
+        raise ValueError(
+            "Parquet file does not have field-ids and the Iceberg table does 
not have 'schema.name-mapping.default' defined"
+        )
     return visit_pyarrow(schema, visitor)
 
 
@@ -653,50 +665,47 @@ def visit_pyarrow(obj: Union[pa.DataType, pa.Schema], 
visitor: PyArrowSchemaVisi
 
 
 @visit_pyarrow.register(pa.Schema)
-def _(obj: pa.Schema, visitor: PyArrowSchemaVisitor[T]) -> Optional[T]:
-    struct_results: List[Optional[T]] = []
-    for field in obj:
-        visitor.before_field(field)
-        struct_result = visit_pyarrow(field.type, visitor)
-        visitor.after_field(field)
-        struct_results.append(struct_result)
-
-    return visitor.schema(obj, struct_results)
+def _(obj: pa.Schema, visitor: PyArrowSchemaVisitor[T]) -> T:
+    return visitor.schema(obj, visit_pyarrow(pa.struct(obj), visitor))
 
 
 @visit_pyarrow.register(pa.StructType)
-def _(obj: pa.StructType, visitor: PyArrowSchemaVisitor[T]) -> Optional[T]:
-    struct_results: List[Optional[T]] = []
+def _(obj: pa.StructType, visitor: PyArrowSchemaVisitor[T]) -> T:
+    results = []
+
     for field in obj:
         visitor.before_field(field)
-        struct_result = visit_pyarrow(field.type, visitor)
+        result = visit_pyarrow(field.type, visitor)
+        results.append(visitor.field(field, result))
         visitor.after_field(field)
-        struct_results.append(struct_result)
 
-    return visitor.struct(obj, struct_results)
+    return visitor.struct(obj, results)
 
 
 @visit_pyarrow.register(pa.ListType)
-def _(obj: pa.ListType, visitor: PyArrowSchemaVisitor[T]) -> Optional[T]:
-    visitor.before_field(obj.value_field)
-    list_result = visit_pyarrow(obj.value_field.type, visitor)
-    visitor.after_field(obj.value_field)
-    return visitor.list(obj, list_result)
+def _(obj: pa.ListType, visitor: PyArrowSchemaVisitor[T]) -> T:
+    visitor.before_list_element(obj.value_field)
+    result = visit_pyarrow(obj.value_type, visitor)
+    visitor.after_list_element(obj.value_field)
+
+    return visitor.list(obj, result)
 
 
 @visit_pyarrow.register(pa.MapType)
-def _(obj: pa.MapType, visitor: PyArrowSchemaVisitor[T]) -> Optional[T]:
-    visitor.before_field(obj.key_field)
-    key_result = visit_pyarrow(obj.key_field.type, visitor)
-    visitor.after_field(obj.key_field)
-    visitor.before_field(obj.item_field)
-    value_result = visit_pyarrow(obj.item_field.type, visitor)
-    visitor.after_field(obj.item_field)
+def _(obj: pa.MapType, visitor: PyArrowSchemaVisitor[T]) -> T:
+    visitor.before_map_key(obj.key_field)
+    key_result = visit_pyarrow(obj.key_type, visitor)
+    visitor.after_map_key(obj.key_field)
+
+    visitor.before_map_value(obj.item_field)
+    value_result = visit_pyarrow(obj.item_type, visitor)
+    visitor.after_map_value(obj.item_field)
+
     return visitor.map(obj, key_result, value_result)
 
 
 @visit_pyarrow.register(pa.DataType)
-def _(obj: pa.DataType, visitor: PyArrowSchemaVisitor[T]) -> Optional[T]:
+def _(obj: pa.DataType, visitor: PyArrowSchemaVisitor[T]) -> T:
     if pa.types.is_nested(obj):
         raise TypeError(f"Expected primitive type, got: {type(obj)}")
     return visitor.primitive(obj)
@@ -709,24 +718,46 @@ class PyArrowSchemaVisitor(Generic[T], ABC):
     def after_field(self, field: pa.Field) -> None:
         """Override this method to perform an action immediately after 
visiting a field."""
 
+    def before_list_element(self, element: pa.Field) -> None:
+        """Override this method to perform an action immediately before 
visiting an element within a ListType."""
+
+    def after_list_element(self, element: pa.Field) -> None:
+        """Override this method to perform an action immediately after 
visiting an element within a ListType."""
+
+    def before_map_key(self, key: pa.Field) -> None:
+        """Override this method to perform an action immediately before 
visiting a key within a MapType."""
+
+    def after_map_key(self, key: pa.Field) -> None:
+        """Override this method to perform an action immediately after 
visiting a key within a MapType."""
+
+    def before_map_value(self, value: pa.Field) -> None:
+        """Override this method to perform an action immediately before 
visiting a value within a MapType."""
+
+    def after_map_value(self, value: pa.Field) -> None:
+        """Override this method to perform an action immediately after 
visiting a value within a MapType."""
+
     @abstractmethod
-    def schema(self, schema: pa.Schema, field_results: List[Optional[T]]) -> 
Optional[T]:
+    def schema(self, schema: pa.Schema, struct_result: T) -> T:
         """Visit a schema."""
 
     @abstractmethod
-    def struct(self, struct: pa.StructType, field_results: List[Optional[T]]) 
-> Optional[T]:
+    def struct(self, struct: pa.StructType, field_results: List[T]) -> T:
         """Visit a struct."""
 
     @abstractmethod
-    def list(self, list_type: pa.ListType, element_result: Optional[T]) -> 
Optional[T]:
+    def field(self, field: pa.Field, field_result: T) -> T:
+        """Visit a field."""
+
+    @abstractmethod
+    def list(self, list_type: pa.ListType, element_result: T) -> T:
         """Visit a list."""
 
     @abstractmethod
-    def map(self, map_type: pa.MapType, key_result: Optional[T], value_result: 
Optional[T]) -> Optional[T]:
+    def map(self, map_type: pa.MapType, key_result: T, value_result: T) -> T:
         """Visit a map."""
 
     @abstractmethod
-    def primitive(self, primitive: pa.DataType) -> Optional[T]:
+    def primitive(self, primitive: pa.DataType) -> T:
         """Visit a primitive type."""
 
 
@@ -738,42 +769,84 @@ def _get_field_id(field: pa.Field) -> Optional[int]:
     )
 
 
-class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]):
-    def _convert_fields(self, arrow_fields: Iterable[pa.Field], field_results: 
List[Optional[IcebergType]]) -> List[NestedField]:
-        fields = []
-        for i, field in enumerate(arrow_fields):
-            field_id = _get_field_id(field)
-            field_doc = doc_str.decode() if (field.metadata and (doc_str := 
field.metadata.get(PYARROW_FIELD_DOC_KEY))) else None
-            field_type = field_results[i]
-            if field_type is not None and field_id is not None:
-                fields.append(NestedField(field_id, field.name, field_type, 
required=not field.nullable, doc=field_doc))
-        return fields
-
-    def schema(self, schema: pa.Schema, field_results: 
List[Optional[IcebergType]]) -> Schema:
-        return Schema(*self._convert_fields(schema, field_results))
-
-    def struct(self, struct: pa.StructType, field_results: 
List[Optional[IcebergType]]) -> IcebergType:
-        return StructType(*self._convert_fields(struct, field_results))
-
-    def list(self, list_type: pa.ListType, element_result: 
Optional[IcebergType]) -> Optional[IcebergType]:
+class _HasIds(PyArrowSchemaVisitor[bool]):
+    def schema(self, schema: pa.Schema, struct_result: bool) -> bool:
+        return struct_result
+
+    def struct(self, struct: pa.StructType, field_results: List[bool]) -> bool:
+        return all(field_results)
+
+    def field(self, field: pa.Field, field_result: bool) -> bool:
+        return all([_get_field_id(field) is not None, field_result])
+
+    def list(self, list_type: pa.ListType, element_result: bool) -> bool:
         element_field = list_type.value_field
         element_id = _get_field_id(element_field)
-        if element_result is not None and element_id is not None:
-            return ListType(element_id, element_result, element_required=not 
element_field.nullable)
-        return None
+        return element_result and element_id is not None
 
-    def map(
-        self, map_type: pa.MapType, key_result: Optional[IcebergType], 
value_result: Optional[IcebergType]
-    ) -> Optional[IcebergType]:
+    def map(self, map_type: pa.MapType, key_result: bool, value_result: bool) 
-> bool:
         key_field = map_type.key_field
         key_id = _get_field_id(key_field)
         value_field = map_type.item_field
         value_id = _get_field_id(value_field)
-        if key_result is not None and value_result is not None and key_id is 
not None and value_id is not None:
-            return MapType(key_id, key_result, value_id, value_result, 
value_required=not value_field.nullable)
-        return None
+        return all([key_id is not None, value_id is not None, key_result, 
value_result])
+
+    def primitive(self, primitive: pa.DataType) -> bool:
+        return True
 
-    def primitive(self, primitive: pa.DataType) -> IcebergType:
+
+class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]):
+    """Converts PyArrowSchema to Iceberg Schema. Applies the IDs from 
name_mapping if provided."""
+
+    _field_names: List[str]
+    _name_mapping: Optional[NameMapping]
+
+    def __init__(self, name_mapping: Optional[NameMapping] = None) -> None:
+        self._field_names = []
+        self._name_mapping = name_mapping
+
+    def _current_path(self) -> str:
+        return ".".join(self._field_names)
+
+    def _field_id(self, field: pa.Field) -> int:
+        if self._name_mapping:
+            return self._name_mapping.find(self._current_path()).field_id
+        elif (field_id := _get_field_id(field)) is not None:
+            return field_id
+        else:
+            raise ValueError(f"Cannot convert {field} to Iceberg Field as 
field_id is empty.")
+
+    def schema(self, schema: pa.Schema, struct_result: StructType) -> Schema:
+        return Schema(*struct_result.fields)
+
+    def struct(self, struct: pa.StructType, field_results: List[NestedField]) 
-> StructType:
+        return StructType(*field_results)
+
+    def field(self, field: pa.Field, field_result: IcebergType) -> NestedField:
+        field_id = self._field_id(field)
+        field_doc = doc_str.decode() if (field.metadata and (doc_str := 
field.metadata.get(PYARROW_FIELD_DOC_KEY))) else None
+        field_type = field_result
+        return NestedField(field_id, field.name, field_type, required=not 
field.nullable, doc=field_doc)
+
+    def list(self, list_type: pa.ListType, element_result: IcebergType) -> 
ListType:
+        element_field = list_type.value_field
+        self._field_names.append(LIST_ELEMENT_NAME)
+        element_id = self._field_id(element_field)
+        self._field_names.pop()
+        return ListType(element_id, element_result, element_required=not 
element_field.nullable)
+
+    def map(self, map_type: pa.MapType, key_result: IcebergType, value_result: 
IcebergType) -> MapType:
+        key_field = map_type.key_field
+        self._field_names.append(MAP_KEY_NAME)
+        key_id = self._field_id(key_field)
+        self._field_names.pop()
+        value_field = map_type.item_field
+        self._field_names.append(MAP_VALUE_NAME)
+        value_id = self._field_id(value_field)
+        self._field_names.pop()
+        return MapType(key_id, key_result, value_id, value_result, 
value_required=not value_field.nullable)
+
+    def primitive(self, primitive: pa.DataType) -> PrimitiveType:
         if pa.types.is_boolean(primitive):
             return BooleanType()
         elif pa.types.is_int32(primitive):
@@ -808,6 +881,30 @@ class 
_ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]):
 
         raise TypeError(f"Unsupported type: {primitive}")
 
+    def before_field(self, field: pa.Field) -> None:
+        self._field_names.append(field.name)
+
+    def after_field(self, field: pa.Field) -> None:
+        self._field_names.pop()
+
+    def before_list_element(self, element: pa.Field) -> None:
+        self._field_names.append(LIST_ELEMENT_NAME)
+
+    def after_list_element(self, element: pa.Field) -> None:
+        self._field_names.pop()
+
+    def before_map_key(self, key: pa.Field) -> None:
+        self._field_names.append(MAP_KEY_NAME)
+
+    def after_map_key(self, element: pa.Field) -> None:
+        self._field_names.pop()
+
+    def before_map_value(self, value: pa.Field) -> None:
+        self._field_names.append(MAP_VALUE_NAME)
+
+    def after_map_value(self, element: pa.Field) -> None:
+        self._field_names.pop()
+
 
 def _task_to_table(
     fs: FileSystem,
@@ -819,6 +916,7 @@ def _task_to_table(
     case_sensitive: bool,
     row_counts: List[int],
     limit: Optional[int] = None,
+    name_mapping: Optional[NameMapping] = None,
 ) -> Optional[pa.Table]:
     if limit and sum(row_counts) >= limit:
         return None
@@ -831,9 +929,9 @@ def _task_to_table(
         schema_raw = None
         if metadata := physical_schema.metadata:
             schema_raw = metadata.get(ICEBERG_SCHEMA)
-        # TODO: if field_ids are not present, Name Mapping should be 
implemented to look them up in the table schema,
-        #  see https://github.com/apache/iceberg/issues/7451
-        file_schema = Schema.model_validate_json(schema_raw) if schema_raw is 
not None else pyarrow_to_schema(physical_schema)
+        file_schema = (
+            Schema.model_validate_json(schema_raw) if schema_raw is not None 
else pyarrow_to_schema(physical_schema, name_mapping)
+        )
 
         pyarrow_filter = None
         if bound_row_filter is not AlwaysTrue():
@@ -970,6 +1068,7 @@ def project_table(
             case_sensitive,
             row_counts,
             limit,
+            table.name_mapping(),
         )
         for task in tasks
     ]
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index 7c1b0bd..057dd84 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -81,6 +81,12 @@ from pyiceberg.table.metadata import (
     TableMetadata,
     TableMetadataUtil,
 )
+from pyiceberg.table.name_mapping import (
+    SCHEMA_NAME_MAPPING_DEFAULT,
+    NameMapping,
+    create_mapping_from_schema,
+    parse_mapping_from_json,
+)
 from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef
 from pyiceberg.table.snapshots import (
     Operation,
@@ -909,6 +915,13 @@ class Table:
     def update_schema(self, allow_incompatible_changes: bool = False, 
case_sensitive: bool = True) -> UpdateSchema:
         return UpdateSchema(self, 
allow_incompatible_changes=allow_incompatible_changes, 
case_sensitive=case_sensitive)
 
+    def name_mapping(self) -> NameMapping:
+        """Return the table's field-id NameMapping."""
+        if name_mapping_json := 
self.properties.get(SCHEMA_NAME_MAPPING_DEFAULT):
+            return parse_mapping_from_json(name_mapping_json)
+        else:
+            return create_mapping_from_schema(self.schema())
+
     def append(self, df: pa.Table) -> None:
         """
         Append data to the table.
diff --git a/pyiceberg/table/name_mapping.py b/pyiceberg/table/name_mapping.py
index ffe9635..84a295f 100644
--- a/pyiceberg/table/name_mapping.py
+++ b/pyiceberg/table/name_mapping.py
@@ -34,6 +34,8 @@ from pyiceberg.schema import Schema, SchemaVisitor, visit
 from pyiceberg.typedef import IcebergBaseModel, IcebergRootModel
 from pyiceberg.types import ListType, MapType, NestedField, PrimitiveType, 
StructType
 
+SCHEMA_NAME_MAPPING_DEFAULT = "schema.name-mapping.default"
+
 
 class MappedField(IcebergBaseModel):
     field_id: int = Field(alias="field-id")
diff --git a/tests/io/test_pyarrow_visitor.py b/tests/io/test_pyarrow_visitor.py
index c307440..0986eac 100644
--- a/tests/io/test_pyarrow_visitor.py
+++ b/tests/io/test_pyarrow_visitor.py
@@ -23,11 +23,13 @@ import pytest
 from pyiceberg.io.pyarrow import (
     _ConvertToArrowSchema,
     _ConvertToIceberg,
+    _HasIds,
     pyarrow_to_schema,
     schema_to_pyarrow,
     visit_pyarrow,
 )
 from pyiceberg.schema import Schema, visit
+from pyiceberg.table.name_mapping import MappedField, NameMapping
 from pyiceberg.types import (
     BinaryType,
     BooleanType,
@@ -49,6 +51,104 @@ from pyiceberg.types import (
 )
 
 
[email protected](scope="module")
+def pyarrow_schema_simple_without_ids() -> pa.Schema:
+    return pa.schema([pa.field('some_int', pa.int32(), nullable=True), 
pa.field('some_string', pa.string(), nullable=False)])
+
+
[email protected](scope="module")
+def pyarrow_schema_nested_without_ids() -> pa.Schema:
+    return pa.schema([
+        pa.field('foo', pa.string(), nullable=False),
+        pa.field('bar', pa.int32(), nullable=False),
+        pa.field('baz', pa.bool_(), nullable=True),
+        pa.field('qux', pa.list_(pa.string()), nullable=False),
+        pa.field(
+            'quux',
+            pa.map_(
+                pa.string(),
+                pa.map_(pa.string(), pa.int32()),
+            ),
+            nullable=False,
+        ),
+        pa.field(
+            'location',
+            pa.list_(
+                pa.struct([
+                    pa.field('latitude', pa.float32(), nullable=False),
+                    pa.field('longitude', pa.float32(), nullable=False),
+                ]),
+            ),
+            nullable=False,
+        ),
+        pa.field(
+            'person',
+            pa.struct([
+                pa.field('name', pa.string(), nullable=True),
+                pa.field('age', pa.int32(), nullable=False),
+            ]),
+            nullable=True,
+        ),
+    ])
+
+
[email protected](scope="module")
+def iceberg_schema_simple() -> Schema:
+    return Schema(
+        NestedField(field_id=1, name="some_int", field_type=IntegerType(), 
required=False),
+        NestedField(field_id=2, name="some_string", field_type=StringType(), 
required=True),
+    )
+
+
[email protected](scope="module")
+def iceberg_schema_nested() -> Schema:
+    return Schema(
+        NestedField(field_id=1, name="foo", field_type=StringType(), 
required=True),
+        NestedField(field_id=2, name="bar", field_type=IntegerType(), 
required=True),
+        NestedField(field_id=3, name="baz", field_type=BooleanType(), 
required=False),
+        NestedField(
+            field_id=4,
+            name="qux",
+            field_type=ListType(element_id=5, element_type=StringType(), 
element_required=False),
+            required=True,
+        ),
+        NestedField(
+            field_id=6,
+            name="quux",
+            field_type=MapType(
+                key_id=7,
+                key_type=StringType(),
+                value_id=8,
+                value_type=MapType(key_id=9, key_type=StringType(), 
value_id=10, value_type=IntegerType(), value_required=False),
+                value_required=False,
+            ),
+            required=True,
+        ),
+        NestedField(
+            field_id=11,
+            name="location",
+            field_type=ListType(
+                element_id=12,
+                element_type=StructType(
+                    NestedField(field_id=13, name="latitude", 
field_type=FloatType(), required=True),
+                    NestedField(field_id=14, name="longitude", 
field_type=FloatType(), required=True),
+                ),
+                element_required=False,
+            ),
+            required=True,
+        ),
+        NestedField(
+            field_id=15,
+            name="person",
+            field_type=StructType(
+                NestedField(field_id=16, name="name", field_type=StringType(), 
required=False),
+                NestedField(field_id=17, name="age", field_type=IntegerType(), 
required=True),
+            ),
+            required=False,
+        ),
+    )
+
+
 def test_pyarrow_binary_to_iceberg() -> None:
     length = 23
     pyarrow_type = pa.binary(length)
@@ -267,3 +367,208 @@ def 
test_round_schema_conversion_nested(table_schema_nested: Schema) -> None:
   15: person: optional struct<16: name: optional string, 17: age: required int>
 }"""
     assert actual == expected
+
+
+def test_simple_schema_has_missing_ids() -> None:
+    schema = pa.schema([
+        pa.field('foo', pa.string(), nullable=False),
+    ])
+    visitor = _HasIds()
+    has_ids = visit_pyarrow(schema, visitor)
+    assert not has_ids
+
+
+def test_simple_schema_has_missing_ids_partial() -> None:
+    schema = pa.schema([
+        pa.field('foo', pa.string(), nullable=False, 
metadata={"PARQUET:field_id": "1", "doc": "foo doc"}),
+        pa.field('bar', pa.int32(), nullable=False),
+    ])
+    visitor = _HasIds()
+    has_ids = visit_pyarrow(schema, visitor)
+    assert not has_ids
+
+
+def test_nested_schema_has_missing_ids() -> None:
+    schema = pa.schema([
+        pa.field('foo', pa.string(), nullable=False),
+        pa.field(
+            'quux',
+            pa.map_(
+                pa.string(),
+                pa.map_(pa.string(), pa.int32()),
+            ),
+            nullable=False,
+        ),
+    ])
+    visitor = _HasIds()
+    has_ids = visit_pyarrow(schema, visitor)
+    assert not has_ids
+
+
+def test_nested_schema_has_ids() -> None:
+    schema = pa.schema([
+        pa.field('foo', pa.string(), nullable=False, 
metadata={"PARQUET:field_id": "1", "doc": "foo doc"}),
+        pa.field(
+            'quux',
+            pa.map_(
+                pa.field("key", pa.string(), nullable=False, 
metadata={"PARQUET:field_id": "7"}),
+                pa.field(
+                    "value",
+                    pa.map_(
+                        pa.field('key', pa.string(), nullable=False, 
metadata={"PARQUET:field_id": "9"}),
+                        pa.field('value', pa.int32(), 
metadata={"PARQUET:field_id": "10"}),
+                    ),
+                    nullable=False,
+                    metadata={"PARQUET:field_id": "8"},
+                ),
+            ),
+            nullable=False,
+            metadata={"PARQUET:field_id": "6", "doc": "quux doc"},
+        ),
+    ])
+    visitor = _HasIds()
+    has_ids = visit_pyarrow(schema, visitor)
+    assert has_ids
+
+
+def test_nested_schema_has_partial_missing_ids() -> None:
+    schema = pa.schema([
+        pa.field('foo', pa.string(), nullable=False, 
metadata={"PARQUET:field_id": "1", "doc": "foo doc"}),
+        pa.field(
+            'quux',
+            pa.map_(
+                pa.field("key", pa.string(), nullable=False, 
metadata={"PARQUET:field_id": "7"}),
+                pa.field(
+                    "value",
+                    pa.map_(pa.field('key', pa.string(), nullable=False), 
pa.field('value', pa.int32())),
+                    nullable=False,
+                ),
+            ),
+            nullable=False,
+            metadata={"PARQUET:field_id": "6", "doc": "quux doc"},
+        ),
+    ])
+    visitor = _HasIds()
+    has_ids = visit_pyarrow(schema, visitor)
+    assert not has_ids
+
+
+def 
test_pyarrow_schema_to_schema_missing_ids_and_name_mapping(pyarrow_schema_simple_without_ids:
 pa.Schema) -> None:
+    schema = pyarrow_schema_simple_without_ids
+    with pytest.raises(ValueError) as exc_info:
+        _ = pyarrow_to_schema(schema)
+    assert (
+        "Parquet file does not have field-ids and the Iceberg table does not 
have 'schema.name-mapping.default' defined"
+        in str(exc_info.value)
+    )
+
+
+def test_simple_pyarrow_schema_to_schema_missing_ids_using_name_mapping(
+    pyarrow_schema_simple_without_ids: pa.Schema, iceberg_schema_simple: Schema
+) -> None:
+    schema = pyarrow_schema_simple_without_ids
+    name_mapping = NameMapping([
+        MappedField(field_id=1, names=['some_int']),
+        MappedField(field_id=2, names=['some_string']),
+    ])
+
+    assert pyarrow_to_schema(schema, name_mapping) == iceberg_schema_simple
+
+
+def 
test_simple_pyarrow_schema_to_schema_missing_ids_using_name_mapping_partial_exception(
+    pyarrow_schema_simple_without_ids: pa.Schema,
+) -> None:
+    schema = pyarrow_schema_simple_without_ids
+    name_mapping = NameMapping([
+        MappedField(field_id=1, names=['some_string']),
+    ])
+    with pytest.raises(ValueError) as exc_info:
+        _ = pyarrow_to_schema(schema, name_mapping)
+    assert "Could not find field with name: some_int" in str(exc_info.value)
+
+
+def test_nested_pyarrow_schema_to_schema_missing_ids_using_name_mapping(
+    pyarrow_schema_nested_without_ids: pa.Schema, iceberg_schema_nested: Schema
+) -> None:
+    schema = pyarrow_schema_nested_without_ids
+
+    name_mapping = NameMapping([
+        MappedField(field_id=1, names=['foo']),
+        MappedField(field_id=2, names=['bar']),
+        MappedField(field_id=3, names=['baz']),
+        MappedField(field_id=4, names=['qux'], fields=[MappedField(field_id=5, 
names=['element'])]),
+        MappedField(
+            field_id=6,
+            names=['quux'],
+            fields=[
+                MappedField(field_id=7, names=['key']),
+                MappedField(
+                    field_id=8,
+                    names=['value'],
+                    fields=[
+                        MappedField(field_id=9, names=['key']),
+                        MappedField(field_id=10, names=['value']),
+                    ],
+                ),
+            ],
+        ),
+        MappedField(
+            field_id=11,
+            names=['location'],
+            fields=[
+                MappedField(
+                    field_id=12,
+                    names=['element'],
+                    fields=[
+                        MappedField(field_id=13, names=['latitude']),
+                        MappedField(field_id=14, names=['longitude']),
+                    ],
+                )
+            ],
+        ),
+        MappedField(
+            field_id=15,
+            names=['person'],
+            fields=[
+                MappedField(field_id=16, names=['name']),
+                MappedField(field_id=17, names=['age']),
+            ],
+        ),
+    ])
+
+    assert pyarrow_to_schema(schema, name_mapping) == iceberg_schema_nested
+
+
+def 
test_pyarrow_schema_to_schema_missing_ids_using_name_mapping_nested_missing_id()
 -> None:
+    schema = pa.schema([
+        pa.field('foo', pa.string(), nullable=False),
+        pa.field(
+            'quux',
+            pa.map_(
+                pa.string(),
+                pa.map_(pa.string(), pa.int32()),
+            ),
+            nullable=False,
+        ),
+    ])
+
+    name_mapping = NameMapping([
+        MappedField(field_id=1, names=['foo']),
+        MappedField(
+            field_id=6,
+            names=['quux'],
+            fields=[
+                MappedField(field_id=7, names=['key']),
+                MappedField(
+                    field_id=8,
+                    names=['value'],
+                    fields=[
+                        MappedField(field_id=10, names=['value']),
+                    ],
+                ),
+            ],
+        ),
+    ])
+    with pytest.raises(ValueError) as exc_info:
+        _ = pyarrow_to_schema(schema, name_mapping)
+    assert "Could not find field with name: quux.value.key" in 
str(exc_info.value)

Reply via email to