This is an automated email from the ASF dual-hosted git repository.
honahx pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new 70972d9 Apply Name mapping (#219)
70972d9 is described below
commit 70972d9436f24fd899a7a2c8600c96bb4d282e12
Author: Sung Yun <[email protected]>
AuthorDate: Thu Jan 18 20:30:01 2024 -0500
Apply Name mapping (#219)
---
pyiceberg/io/pyarrow.py | 229 ++++++++++++++++++++---------
pyiceberg/table/__init__.py | 13 ++
pyiceberg/table/name_mapping.py | 2 +
tests/io/test_pyarrow_visitor.py | 305 +++++++++++++++++++++++++++++++++++++++
4 files changed, 484 insertions(+), 65 deletions(-)
diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py
index b4988c6..035f5e8 100644
--- a/pyiceberg/io/pyarrow.py
+++ b/pyiceberg/io/pyarrow.py
@@ -124,6 +124,7 @@ from pyiceberg.schema import (
visit_with_partner,
)
from pyiceberg.table import WriteTask
+from pyiceberg.table.name_mapping import NameMapping
from pyiceberg.transforms import TruncateTransform
from pyiceberg.typedef import EMPTY_DICT, Properties, Record
from pyiceberg.types import (
@@ -164,6 +165,9 @@ ICEBERG_SCHEMA = b"iceberg.schema"
# The PARQUET: in front means that it is Parquet specific, in this case the
field_id
PYARROW_PARQUET_FIELD_ID_KEY = b"PARQUET:field_id"
PYARROW_FIELD_DOC_KEY = b"doc"
+LIST_ELEMENT_NAME = "element"
+MAP_KEY_NAME = "key"
+MAP_VALUE_NAME = "value"
T = TypeVar("T")
@@ -631,8 +635,16 @@ def _combine_positional_deletes(positional_deletes:
List[pa.ChunkedArray], rows:
return np.setdiff1d(np.arange(rows), all_chunks, assume_unique=False)
-def pyarrow_to_schema(schema: pa.Schema) -> Schema:
- visitor = _ConvertToIceberg()
+def pyarrow_to_schema(schema: pa.Schema, name_mapping: Optional[NameMapping] =
None) -> Schema:
+ has_ids = visit_pyarrow(schema, _HasIds())
+ if has_ids:
+ visitor = _ConvertToIceberg()
+ elif name_mapping is not None:
+ visitor = _ConvertToIceberg(name_mapping=name_mapping)
+ else:
+ raise ValueError(
+ "Parquet file does not have field-ids and the Iceberg table does
not have 'schema.name-mapping.default' defined"
+ )
return visit_pyarrow(schema, visitor)
@@ -653,50 +665,47 @@ def visit_pyarrow(obj: Union[pa.DataType, pa.Schema],
visitor: PyArrowSchemaVisi
@visit_pyarrow.register(pa.Schema)
-def _(obj: pa.Schema, visitor: PyArrowSchemaVisitor[T]) -> Optional[T]:
- struct_results: List[Optional[T]] = []
- for field in obj:
- visitor.before_field(field)
- struct_result = visit_pyarrow(field.type, visitor)
- visitor.after_field(field)
- struct_results.append(struct_result)
-
- return visitor.schema(obj, struct_results)
+def _(obj: pa.Schema, visitor: PyArrowSchemaVisitor[T]) -> T:
+ return visitor.schema(obj, visit_pyarrow(pa.struct(obj), visitor))
@visit_pyarrow.register(pa.StructType)
-def _(obj: pa.StructType, visitor: PyArrowSchemaVisitor[T]) -> Optional[T]:
- struct_results: List[Optional[T]] = []
+def _(obj: pa.StructType, visitor: PyArrowSchemaVisitor[T]) -> T:
+ results = []
+
for field in obj:
visitor.before_field(field)
- struct_result = visit_pyarrow(field.type, visitor)
+ result = visit_pyarrow(field.type, visitor)
+ results.append(visitor.field(field, result))
visitor.after_field(field)
- struct_results.append(struct_result)
- return visitor.struct(obj, struct_results)
+ return visitor.struct(obj, results)
@visit_pyarrow.register(pa.ListType)
-def _(obj: pa.ListType, visitor: PyArrowSchemaVisitor[T]) -> Optional[T]:
- visitor.before_field(obj.value_field)
- list_result = visit_pyarrow(obj.value_field.type, visitor)
- visitor.after_field(obj.value_field)
- return visitor.list(obj, list_result)
+def _(obj: pa.ListType, visitor: PyArrowSchemaVisitor[T]) -> T:
+ visitor.before_list_element(obj.value_field)
+ result = visit_pyarrow(obj.value_type, visitor)
+ visitor.after_list_element(obj.value_field)
+
+ return visitor.list(obj, result)
@visit_pyarrow.register(pa.MapType)
-def _(obj: pa.MapType, visitor: PyArrowSchemaVisitor[T]) -> Optional[T]:
- visitor.before_field(obj.key_field)
- key_result = visit_pyarrow(obj.key_field.type, visitor)
- visitor.after_field(obj.key_field)
- visitor.before_field(obj.item_field)
- value_result = visit_pyarrow(obj.item_field.type, visitor)
- visitor.after_field(obj.item_field)
+def _(obj: pa.MapType, visitor: PyArrowSchemaVisitor[T]) -> T:
+ visitor.before_map_key(obj.key_field)
+ key_result = visit_pyarrow(obj.key_type, visitor)
+ visitor.after_map_key(obj.key_field)
+
+ visitor.before_map_value(obj.item_field)
+ value_result = visit_pyarrow(obj.item_type, visitor)
+ visitor.after_map_value(obj.item_field)
+
return visitor.map(obj, key_result, value_result)
@visit_pyarrow.register(pa.DataType)
-def _(obj: pa.DataType, visitor: PyArrowSchemaVisitor[T]) -> Optional[T]:
+def _(obj: pa.DataType, visitor: PyArrowSchemaVisitor[T]) -> T:
if pa.types.is_nested(obj):
raise TypeError(f"Expected primitive type, got: {type(obj)}")
return visitor.primitive(obj)
@@ -709,24 +718,46 @@ class PyArrowSchemaVisitor(Generic[T], ABC):
def after_field(self, field: pa.Field) -> None:
"""Override this method to perform an action immediately after
visiting a field."""
+ def before_list_element(self, element: pa.Field) -> None:
+ """Override this method to perform an action immediately before
visiting an element within a ListType."""
+
+ def after_list_element(self, element: pa.Field) -> None:
+ """Override this method to perform an action immediately after
visiting an element within a ListType."""
+
+ def before_map_key(self, key: pa.Field) -> None:
+ """Override this method to perform an action immediately before
visiting a key within a MapType."""
+
+ def after_map_key(self, key: pa.Field) -> None:
+ """Override this method to perform an action immediately after
visiting a key within a MapType."""
+
+ def before_map_value(self, value: pa.Field) -> None:
+ """Override this method to perform an action immediately before
visiting a value within a MapType."""
+
+ def after_map_value(self, value: pa.Field) -> None:
+ """Override this method to perform an action immediately after
visiting a value within a MapType."""
+
@abstractmethod
- def schema(self, schema: pa.Schema, field_results: List[Optional[T]]) ->
Optional[T]:
+ def schema(self, schema: pa.Schema, struct_result: T) -> T:
"""Visit a schema."""
@abstractmethod
- def struct(self, struct: pa.StructType, field_results: List[Optional[T]])
-> Optional[T]:
+ def struct(self, struct: pa.StructType, field_results: List[T]) -> T:
"""Visit a struct."""
@abstractmethod
- def list(self, list_type: pa.ListType, element_result: Optional[T]) ->
Optional[T]:
+ def field(self, field: pa.Field, field_result: T) -> T:
+ """Visit a field."""
+
+ @abstractmethod
+ def list(self, list_type: pa.ListType, element_result: T) -> T:
"""Visit a list."""
@abstractmethod
- def map(self, map_type: pa.MapType, key_result: Optional[T], value_result:
Optional[T]) -> Optional[T]:
+ def map(self, map_type: pa.MapType, key_result: T, value_result: T) -> T:
"""Visit a map."""
@abstractmethod
- def primitive(self, primitive: pa.DataType) -> Optional[T]:
+ def primitive(self, primitive: pa.DataType) -> T:
"""Visit a primitive type."""
@@ -738,42 +769,84 @@ def _get_field_id(field: pa.Field) -> Optional[int]:
)
-class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]):
- def _convert_fields(self, arrow_fields: Iterable[pa.Field], field_results:
List[Optional[IcebergType]]) -> List[NestedField]:
- fields = []
- for i, field in enumerate(arrow_fields):
- field_id = _get_field_id(field)
- field_doc = doc_str.decode() if (field.metadata and (doc_str :=
field.metadata.get(PYARROW_FIELD_DOC_KEY))) else None
- field_type = field_results[i]
- if field_type is not None and field_id is not None:
- fields.append(NestedField(field_id, field.name, field_type,
required=not field.nullable, doc=field_doc))
- return fields
-
- def schema(self, schema: pa.Schema, field_results:
List[Optional[IcebergType]]) -> Schema:
- return Schema(*self._convert_fields(schema, field_results))
-
- def struct(self, struct: pa.StructType, field_results:
List[Optional[IcebergType]]) -> IcebergType:
- return StructType(*self._convert_fields(struct, field_results))
-
- def list(self, list_type: pa.ListType, element_result:
Optional[IcebergType]) -> Optional[IcebergType]:
+class _HasIds(PyArrowSchemaVisitor[bool]):
+ def schema(self, schema: pa.Schema, struct_result: bool) -> bool:
+ return struct_result
+
+ def struct(self, struct: pa.StructType, field_results: List[bool]) -> bool:
+ return all(field_results)
+
+ def field(self, field: pa.Field, field_result: bool) -> bool:
+ return all([_get_field_id(field) is not None, field_result])
+
+ def list(self, list_type: pa.ListType, element_result: bool) -> bool:
element_field = list_type.value_field
element_id = _get_field_id(element_field)
- if element_result is not None and element_id is not None:
- return ListType(element_id, element_result, element_required=not
element_field.nullable)
- return None
+ return element_result and element_id is not None
- def map(
- self, map_type: pa.MapType, key_result: Optional[IcebergType],
value_result: Optional[IcebergType]
- ) -> Optional[IcebergType]:
+ def map(self, map_type: pa.MapType, key_result: bool, value_result: bool)
-> bool:
key_field = map_type.key_field
key_id = _get_field_id(key_field)
value_field = map_type.item_field
value_id = _get_field_id(value_field)
- if key_result is not None and value_result is not None and key_id is
not None and value_id is not None:
- return MapType(key_id, key_result, value_id, value_result,
value_required=not value_field.nullable)
- return None
+ return all([key_id is not None, value_id is not None, key_result,
value_result])
+
+ def primitive(self, primitive: pa.DataType) -> bool:
+ return True
- def primitive(self, primitive: pa.DataType) -> IcebergType:
+
+class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]):
+ """Converts PyArrowSchema to Iceberg Schema. Applies the IDs from
name_mapping if provided."""
+
+ _field_names: List[str]
+ _name_mapping: Optional[NameMapping]
+
+ def __init__(self, name_mapping: Optional[NameMapping] = None) -> None:
+ self._field_names = []
+ self._name_mapping = name_mapping
+
+ def _current_path(self) -> str:
+ return ".".join(self._field_names)
+
+ def _field_id(self, field: pa.Field) -> int:
+ if self._name_mapping:
+ return self._name_mapping.find(self._current_path()).field_id
+ elif (field_id := _get_field_id(field)) is not None:
+ return field_id
+ else:
+ raise ValueError(f"Cannot convert {field} to Iceberg Field as
field_id is empty.")
+
+ def schema(self, schema: pa.Schema, struct_result: StructType) -> Schema:
+ return Schema(*struct_result.fields)
+
+ def struct(self, struct: pa.StructType, field_results: List[NestedField])
-> StructType:
+ return StructType(*field_results)
+
+ def field(self, field: pa.Field, field_result: IcebergType) -> NestedField:
+ field_id = self._field_id(field)
+ field_doc = doc_str.decode() if (field.metadata and (doc_str :=
field.metadata.get(PYARROW_FIELD_DOC_KEY))) else None
+ field_type = field_result
+ return NestedField(field_id, field.name, field_type, required=not
field.nullable, doc=field_doc)
+
+ def list(self, list_type: pa.ListType, element_result: IcebergType) ->
ListType:
+ element_field = list_type.value_field
+ self._field_names.append(LIST_ELEMENT_NAME)
+ element_id = self._field_id(element_field)
+ self._field_names.pop()
+ return ListType(element_id, element_result, element_required=not
element_field.nullable)
+
+ def map(self, map_type: pa.MapType, key_result: IcebergType, value_result:
IcebergType) -> MapType:
+ key_field = map_type.key_field
+ self._field_names.append(MAP_KEY_NAME)
+ key_id = self._field_id(key_field)
+ self._field_names.pop()
+ value_field = map_type.item_field
+ self._field_names.append(MAP_VALUE_NAME)
+ value_id = self._field_id(value_field)
+ self._field_names.pop()
+ return MapType(key_id, key_result, value_id, value_result,
value_required=not value_field.nullable)
+
+ def primitive(self, primitive: pa.DataType) -> PrimitiveType:
if pa.types.is_boolean(primitive):
return BooleanType()
elif pa.types.is_int32(primitive):
@@ -808,6 +881,30 @@ class
_ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]):
raise TypeError(f"Unsupported type: {primitive}")
+ def before_field(self, field: pa.Field) -> None:
+ self._field_names.append(field.name)
+
+ def after_field(self, field: pa.Field) -> None:
+ self._field_names.pop()
+
+ def before_list_element(self, element: pa.Field) -> None:
+ self._field_names.append(LIST_ELEMENT_NAME)
+
+ def after_list_element(self, element: pa.Field) -> None:
+ self._field_names.pop()
+
+ def before_map_key(self, key: pa.Field) -> None:
+ self._field_names.append(MAP_KEY_NAME)
+
+ def after_map_key(self, element: pa.Field) -> None:
+ self._field_names.pop()
+
+ def before_map_value(self, value: pa.Field) -> None:
+ self._field_names.append(MAP_VALUE_NAME)
+
+ def after_map_value(self, element: pa.Field) -> None:
+ self._field_names.pop()
+
def _task_to_table(
fs: FileSystem,
@@ -819,6 +916,7 @@ def _task_to_table(
case_sensitive: bool,
row_counts: List[int],
limit: Optional[int] = None,
+ name_mapping: Optional[NameMapping] = None,
) -> Optional[pa.Table]:
if limit and sum(row_counts) >= limit:
return None
@@ -831,9 +929,9 @@ def _task_to_table(
schema_raw = None
if metadata := physical_schema.metadata:
schema_raw = metadata.get(ICEBERG_SCHEMA)
- # TODO: if field_ids are not present, Name Mapping should be
implemented to look them up in the table schema,
- # see https://github.com/apache/iceberg/issues/7451
- file_schema = Schema.model_validate_json(schema_raw) if schema_raw is
not None else pyarrow_to_schema(physical_schema)
+ file_schema = (
+ Schema.model_validate_json(schema_raw) if schema_raw is not None
else pyarrow_to_schema(physical_schema, name_mapping)
+ )
pyarrow_filter = None
if bound_row_filter is not AlwaysTrue():
@@ -970,6 +1068,7 @@ def project_table(
case_sensitive,
row_counts,
limit,
+ table.name_mapping(),
)
for task in tasks
]
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index 7c1b0bd..057dd84 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -81,6 +81,12 @@ from pyiceberg.table.metadata import (
TableMetadata,
TableMetadataUtil,
)
+from pyiceberg.table.name_mapping import (
+ SCHEMA_NAME_MAPPING_DEFAULT,
+ NameMapping,
+ create_mapping_from_schema,
+ parse_mapping_from_json,
+)
from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef
from pyiceberg.table.snapshots import (
Operation,
@@ -909,6 +915,13 @@ class Table:
def update_schema(self, allow_incompatible_changes: bool = False,
case_sensitive: bool = True) -> UpdateSchema:
return UpdateSchema(self,
allow_incompatible_changes=allow_incompatible_changes,
case_sensitive=case_sensitive)
+ def name_mapping(self) -> NameMapping:
+ """Return the table's field-id NameMapping."""
+ if name_mapping_json :=
self.properties.get(SCHEMA_NAME_MAPPING_DEFAULT):
+ return parse_mapping_from_json(name_mapping_json)
+ else:
+ return create_mapping_from_schema(self.schema())
+
def append(self, df: pa.Table) -> None:
"""
Append data to the table.
diff --git a/pyiceberg/table/name_mapping.py b/pyiceberg/table/name_mapping.py
index ffe9635..84a295f 100644
--- a/pyiceberg/table/name_mapping.py
+++ b/pyiceberg/table/name_mapping.py
@@ -34,6 +34,8 @@ from pyiceberg.schema import Schema, SchemaVisitor, visit
from pyiceberg.typedef import IcebergBaseModel, IcebergRootModel
from pyiceberg.types import ListType, MapType, NestedField, PrimitiveType,
StructType
+SCHEMA_NAME_MAPPING_DEFAULT = "schema.name-mapping.default"
+
class MappedField(IcebergBaseModel):
field_id: int = Field(alias="field-id")
diff --git a/tests/io/test_pyarrow_visitor.py b/tests/io/test_pyarrow_visitor.py
index c307440..0986eac 100644
--- a/tests/io/test_pyarrow_visitor.py
+++ b/tests/io/test_pyarrow_visitor.py
@@ -23,11 +23,13 @@ import pytest
from pyiceberg.io.pyarrow import (
_ConvertToArrowSchema,
_ConvertToIceberg,
+ _HasIds,
pyarrow_to_schema,
schema_to_pyarrow,
visit_pyarrow,
)
from pyiceberg.schema import Schema, visit
+from pyiceberg.table.name_mapping import MappedField, NameMapping
from pyiceberg.types import (
BinaryType,
BooleanType,
@@ -49,6 +51,104 @@ from pyiceberg.types import (
)
[email protected](scope="module")
+def pyarrow_schema_simple_without_ids() -> pa.Schema:
+ return pa.schema([pa.field('some_int', pa.int32(), nullable=True),
pa.field('some_string', pa.string(), nullable=False)])
+
+
[email protected](scope="module")
+def pyarrow_schema_nested_without_ids() -> pa.Schema:
+ return pa.schema([
+ pa.field('foo', pa.string(), nullable=False),
+ pa.field('bar', pa.int32(), nullable=False),
+ pa.field('baz', pa.bool_(), nullable=True),
+ pa.field('qux', pa.list_(pa.string()), nullable=False),
+ pa.field(
+ 'quux',
+ pa.map_(
+ pa.string(),
+ pa.map_(pa.string(), pa.int32()),
+ ),
+ nullable=False,
+ ),
+ pa.field(
+ 'location',
+ pa.list_(
+ pa.struct([
+ pa.field('latitude', pa.float32(), nullable=False),
+ pa.field('longitude', pa.float32(), nullable=False),
+ ]),
+ ),
+ nullable=False,
+ ),
+ pa.field(
+ 'person',
+ pa.struct([
+ pa.field('name', pa.string(), nullable=True),
+ pa.field('age', pa.int32(), nullable=False),
+ ]),
+ nullable=True,
+ ),
+ ])
+
+
[email protected](scope="module")
+def iceberg_schema_simple() -> Schema:
+ return Schema(
+ NestedField(field_id=1, name="some_int", field_type=IntegerType(),
required=False),
+ NestedField(field_id=2, name="some_string", field_type=StringType(),
required=True),
+ )
+
+
[email protected](scope="module")
+def iceberg_schema_nested() -> Schema:
+ return Schema(
+ NestedField(field_id=1, name="foo", field_type=StringType(),
required=True),
+ NestedField(field_id=2, name="bar", field_type=IntegerType(),
required=True),
+ NestedField(field_id=3, name="baz", field_type=BooleanType(),
required=False),
+ NestedField(
+ field_id=4,
+ name="qux",
+ field_type=ListType(element_id=5, element_type=StringType(),
element_required=False),
+ required=True,
+ ),
+ NestedField(
+ field_id=6,
+ name="quux",
+ field_type=MapType(
+ key_id=7,
+ key_type=StringType(),
+ value_id=8,
+ value_type=MapType(key_id=9, key_type=StringType(),
value_id=10, value_type=IntegerType(), value_required=False),
+ value_required=False,
+ ),
+ required=True,
+ ),
+ NestedField(
+ field_id=11,
+ name="location",
+ field_type=ListType(
+ element_id=12,
+ element_type=StructType(
+ NestedField(field_id=13, name="latitude",
field_type=FloatType(), required=True),
+ NestedField(field_id=14, name="longitude",
field_type=FloatType(), required=True),
+ ),
+ element_required=False,
+ ),
+ required=True,
+ ),
+ NestedField(
+ field_id=15,
+ name="person",
+ field_type=StructType(
+ NestedField(field_id=16, name="name", field_type=StringType(),
required=False),
+ NestedField(field_id=17, name="age", field_type=IntegerType(),
required=True),
+ ),
+ required=False,
+ ),
+ )
+
+
def test_pyarrow_binary_to_iceberg() -> None:
length = 23
pyarrow_type = pa.binary(length)
@@ -267,3 +367,208 @@ def
test_round_schema_conversion_nested(table_schema_nested: Schema) -> None:
15: person: optional struct<16: name: optional string, 17: age: required int>
}"""
assert actual == expected
+
+
+def test_simple_schema_has_missing_ids() -> None:
+ schema = pa.schema([
+ pa.field('foo', pa.string(), nullable=False),
+ ])
+ visitor = _HasIds()
+ has_ids = visit_pyarrow(schema, visitor)
+ assert not has_ids
+
+
+def test_simple_schema_has_missing_ids_partial() -> None:
+ schema = pa.schema([
+ pa.field('foo', pa.string(), nullable=False,
metadata={"PARQUET:field_id": "1", "doc": "foo doc"}),
+ pa.field('bar', pa.int32(), nullable=False),
+ ])
+ visitor = _HasIds()
+ has_ids = visit_pyarrow(schema, visitor)
+ assert not has_ids
+
+
+def test_nested_schema_has_missing_ids() -> None:
+ schema = pa.schema([
+ pa.field('foo', pa.string(), nullable=False),
+ pa.field(
+ 'quux',
+ pa.map_(
+ pa.string(),
+ pa.map_(pa.string(), pa.int32()),
+ ),
+ nullable=False,
+ ),
+ ])
+ visitor = _HasIds()
+ has_ids = visit_pyarrow(schema, visitor)
+ assert not has_ids
+
+
+def test_nested_schema_has_ids() -> None:
+ schema = pa.schema([
+ pa.field('foo', pa.string(), nullable=False,
metadata={"PARQUET:field_id": "1", "doc": "foo doc"}),
+ pa.field(
+ 'quux',
+ pa.map_(
+ pa.field("key", pa.string(), nullable=False,
metadata={"PARQUET:field_id": "7"}),
+ pa.field(
+ "value",
+ pa.map_(
+ pa.field('key', pa.string(), nullable=False,
metadata={"PARQUET:field_id": "9"}),
+ pa.field('value', pa.int32(),
metadata={"PARQUET:field_id": "10"}),
+ ),
+ nullable=False,
+ metadata={"PARQUET:field_id": "8"},
+ ),
+ ),
+ nullable=False,
+ metadata={"PARQUET:field_id": "6", "doc": "quux doc"},
+ ),
+ ])
+ visitor = _HasIds()
+ has_ids = visit_pyarrow(schema, visitor)
+ assert has_ids
+
+
+def test_nested_schema_has_partial_missing_ids() -> None:
+ schema = pa.schema([
+ pa.field('foo', pa.string(), nullable=False,
metadata={"PARQUET:field_id": "1", "doc": "foo doc"}),
+ pa.field(
+ 'quux',
+ pa.map_(
+ pa.field("key", pa.string(), nullable=False,
metadata={"PARQUET:field_id": "7"}),
+ pa.field(
+ "value",
+ pa.map_(pa.field('key', pa.string(), nullable=False),
pa.field('value', pa.int32())),
+ nullable=False,
+ ),
+ ),
+ nullable=False,
+ metadata={"PARQUET:field_id": "6", "doc": "quux doc"},
+ ),
+ ])
+ visitor = _HasIds()
+ has_ids = visit_pyarrow(schema, visitor)
+ assert not has_ids
+
+
+def
test_pyarrow_schema_to_schema_missing_ids_and_name_mapping(pyarrow_schema_simple_without_ids:
pa.Schema) -> None:
+ schema = pyarrow_schema_simple_without_ids
+ with pytest.raises(ValueError) as exc_info:
+ _ = pyarrow_to_schema(schema)
+ assert (
+ "Parquet file does not have field-ids and the Iceberg table does not
have 'schema.name-mapping.default' defined"
+ in str(exc_info.value)
+ )
+
+
+def test_simple_pyarrow_schema_to_schema_missing_ids_using_name_mapping(
+ pyarrow_schema_simple_without_ids: pa.Schema, iceberg_schema_simple: Schema
+) -> None:
+ schema = pyarrow_schema_simple_without_ids
+ name_mapping = NameMapping([
+ MappedField(field_id=1, names=['some_int']),
+ MappedField(field_id=2, names=['some_string']),
+ ])
+
+ assert pyarrow_to_schema(schema, name_mapping) == iceberg_schema_simple
+
+
+def
test_simple_pyarrow_schema_to_schema_missing_ids_using_name_mapping_partial_exception(
+ pyarrow_schema_simple_without_ids: pa.Schema,
+) -> None:
+ schema = pyarrow_schema_simple_without_ids
+ name_mapping = NameMapping([
+ MappedField(field_id=1, names=['some_string']),
+ ])
+ with pytest.raises(ValueError) as exc_info:
+ _ = pyarrow_to_schema(schema, name_mapping)
+ assert "Could not find field with name: some_int" in str(exc_info.value)
+
+
+def test_nested_pyarrow_schema_to_schema_missing_ids_using_name_mapping(
+ pyarrow_schema_nested_without_ids: pa.Schema, iceberg_schema_nested: Schema
+) -> None:
+ schema = pyarrow_schema_nested_without_ids
+
+ name_mapping = NameMapping([
+ MappedField(field_id=1, names=['foo']),
+ MappedField(field_id=2, names=['bar']),
+ MappedField(field_id=3, names=['baz']),
+ MappedField(field_id=4, names=['qux'], fields=[MappedField(field_id=5,
names=['element'])]),
+ MappedField(
+ field_id=6,
+ names=['quux'],
+ fields=[
+ MappedField(field_id=7, names=['key']),
+ MappedField(
+ field_id=8,
+ names=['value'],
+ fields=[
+ MappedField(field_id=9, names=['key']),
+ MappedField(field_id=10, names=['value']),
+ ],
+ ),
+ ],
+ ),
+ MappedField(
+ field_id=11,
+ names=['location'],
+ fields=[
+ MappedField(
+ field_id=12,
+ names=['element'],
+ fields=[
+ MappedField(field_id=13, names=['latitude']),
+ MappedField(field_id=14, names=['longitude']),
+ ],
+ )
+ ],
+ ),
+ MappedField(
+ field_id=15,
+ names=['person'],
+ fields=[
+ MappedField(field_id=16, names=['name']),
+ MappedField(field_id=17, names=['age']),
+ ],
+ ),
+ ])
+
+ assert pyarrow_to_schema(schema, name_mapping) == iceberg_schema_nested
+
+
+def
test_pyarrow_schema_to_schema_missing_ids_using_name_mapping_nested_missing_id()
-> None:
+ schema = pa.schema([
+ pa.field('foo', pa.string(), nullable=False),
+ pa.field(
+ 'quux',
+ pa.map_(
+ pa.string(),
+ pa.map_(pa.string(), pa.int32()),
+ ),
+ nullable=False,
+ ),
+ ])
+
+ name_mapping = NameMapping([
+ MappedField(field_id=1, names=['foo']),
+ MappedField(
+ field_id=6,
+ names=['quux'],
+ fields=[
+ MappedField(field_id=7, names=['key']),
+ MappedField(
+ field_id=8,
+ names=['value'],
+ fields=[
+ MappedField(field_id=10, names=['value']),
+ ],
+ ),
+ ],
+ ),
+ ])
+ with pytest.raises(ValueError) as exc_info:
+ _ = pyarrow_to_schema(schema, name_mapping)
+ assert "Could not find field with name: quux.value.key" in
str(exc_info.value)