JonasJ-ap commented on code in PR #6997:
URL: https://github.com/apache/iceberg/pull/6997#discussion_r1125317095


##########
python/tests/io/test_pyarrow.py:
##########
@@ -1130,3 +1131,15 @@ def 
test_projection_filter_on_unknown_field(schema_int_str: Schema, file_int_str
         _ = project(schema, [file_int_str], GreaterThan("unknown_field", "1"), 
schema_int_str)
 
     assert "Could not find field with name unknown_field, case_sensitive=True" 
in str(exc_info.value)
+
+
+def test_pyarrow_to_schema_simple(table_schema_simple: Schema, 
pyarrow_schema_simple: pa.Schema) -> None:
+    actual = str(pyarrow_to_schema(pyarrow_schema_simple))
+    expected = str(table_schema_simple)
+    assert actual == expected
+
+
+def test_pyarrow_to_schema_nested(table_schema_nested: Schema, 
pyarrow_schema_nested: pa.Schema) -> None:
+    actual = str(pyarrow_to_schema(pyarrow_schema_nested))

Review Comment:
   Initially, I was thinking of using `schema_to_pyarrow` and 
`pyarrow_to_schema` to do a round conversion to verify the correctness. But I 
found that `schema_to_pyarrow` does not assign field id to the metadata of list 
element and map key/value.
   
https://github.com/apache/iceberg/blob/ad0f04dd90cf42011535207e661924c3b0456f1c/python/pyiceberg/io/pyarrow.py#L354-L359
   may need to be changed to 
   ```python
       def list(self, list_type: ListType, element_result: pa.DataType) -> 
pa.DataType:
           element_field = self.field(list_type.element_field, element_result)
           return pa.list_(value_type=element_field)
   
       def map(self, map_type: MapType, key_result: pa.DataType, value_result: 
pa.DataType) -> pa.DataType:
           key_field = self.field(map_type.key_field, key_result)
           value_field = self.field(map_type.value_field, value_result)
           return pa.map_(key_type=key_field, item_type=value_field)
   ```
   to preserve the field id in the metadata
   
   Based on my understanding, such change will only benefit the unit test at 
current stage. So I choose to not apply the change and hardcode a pyarrow 
schema fixture in `conftest.py` for testing instead.



##########
python/tests/io/test_pyarrow.py:
##########
@@ -1130,3 +1131,15 @@ def 
test_projection_filter_on_unknown_field(schema_int_str: Schema, file_int_str
         _ = project(schema, [file_int_str], GreaterThan("unknown_field", "1"), 
schema_int_str)
 
     assert "Could not find field with name unknown_field, case_sensitive=True" 
in str(exc_info.value)
+
+
+def test_pyarrow_to_schema_simple(table_schema_simple: Schema, 
pyarrow_schema_simple: pa.Schema) -> None:
+    actual = str(pyarrow_to_schema(pyarrow_schema_simple))
+    expected = str(table_schema_simple)
+    assert actual == expected
+
+
+def test_pyarrow_to_schema_nested(table_schema_nested: Schema, 
pyarrow_schema_nested: pa.Schema) -> None:
+    actual = str(pyarrow_to_schema(pyarrow_schema_nested))
+    expected = str(table_schema_nested)

Review Comment:
   I used `str` instead of `repr` because `repr` exposes 
`schema_id=1,identifier_field_ids=[1]` which is not handled by the 
`pyarrow_to_schema` visitor and I think the visitor need not handle these since 
the schema generated by the visitor will not be put into any table metadata 
file.
   
   Please correct me if I misunderstand something.



##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -476,6 +483,202 @@ def expression_to_pyarrow(expr: BooleanExpression) -> 
pc.Expression:
     return boolean_expression_visit(expr, _ConvertToArrowExpression())
 
 
+def pyarrow_to_schema(schema: pa.Schema) -> Schema:
+    visitor = _ConvertToIceberg()
+    struct_results = []
+    for i in range(len(schema.names)):
+        field = schema.field(i)
+        visitor.before_field(field)
+        struct_result = visit_pyarrow(field.type, visitor)
+        visitor.after_field(field)
+        struct_results.append(struct_result)
+    return visitor.schema(schema, struct_results)
+
+
+@singledispatch
+def visit_pyarrow(obj: pa.DataType, visitor: PyarrowSchemaVisitor[T]) -> T:
+    """A generic function for applying a pyarrow schema visitor to any point 
within a schema
+
+    The function traverses the schema in post-order fashion
+
+    Args:
+        obj(Schema | IcebergType): An instance of a Schema or an IcebergType
+        visitor (PyarrowSchemaVisitor[T]): An instance of an implementation of 
the generic PyarrowSchemaVisitor base class
+
+    Raises:
+        NotImplementedError: If attempting to visit an unrecognized object type
+    """
+    raise NotImplementedError("Cannot visit non-type: %s" % obj)
+
+
+@visit_pyarrow.register(pa.StructType)
+def _(obj: pa.StructType, visitor: PyarrowSchemaVisitor[T]) -> T:
+    struct_results = []
+    for field in obj:
+        visitor.before_field(field)
+        struct_result = visit_pyarrow(field.type, visitor)
+        visitor.after_field(field)
+        struct_results.append(struct_result)
+
+    return visitor.struct(obj, struct_results)
+
+
+@visit_pyarrow.register(pa.ListType)
+def _(obj: pa.ListType, visitor: PyarrowSchemaVisitor[T]) -> T:
+    visitor.before_list_element(obj.value_field)
+    list_result = visit_pyarrow(obj.value_field.type, visitor)
+    visitor.after_list_element(obj.value_field)
+    return visitor.list(obj, list_result)
+
+
+@visit_pyarrow.register(pa.MapType)
+def _(obj: pa.MapType, visitor: PyarrowSchemaVisitor[T]) -> T:
+    visitor.before_map_key(obj.key_field)
+    key_result = visit_pyarrow(obj.key_field.type, visitor)
+    visitor.after_map_key(obj.key_field)
+    visitor.before_map_value(obj.item_field)
+    value_result = visit_pyarrow(obj.item_field.type, visitor)
+    visitor.after_map_value(obj.item_field)
+    return visitor.map(obj, key_result, value_result)
+
+
+@visit_pyarrow.register(pa.DataType)
+def _(obj: pa.DataType, visitor: PyarrowSchemaVisitor[T]) -> T:
+    if pa.types.is_nested(obj):
+        raise TypeError(f"Expected primitive type, got {type(obj)}")
+    return visitor.primitive(obj)
+
+
+class PyarrowSchemaVisitor(Generic[T], ABC):
+    def before_field(self, field: pa.Field) -> None:
+        """Override this method to perform an action immediately before 
visiting a field."""
+
+    def after_field(self, field: pa.Field) -> None:
+        """Override this method to perform an action immediately after 
visiting a field."""
+
+    def before_list_element(self, element: pa.Field) -> None:
+        """Override this method to perform an action immediately before 
visiting a list element."""
+
+    def after_list_element(self, element: pa.Field) -> None:
+        """Override this method to perform an action immediately after 
visiting a list element."""
+
+    def before_map_key(self, key: pa.Field) -> None:
+        """Override this method to perform an action immediately before 
visiting a map key."""
+
+    def after_map_key(self, key: pa.Field) -> None:
+        """Override this method to perform an action immediately after 
visiting a map key."""
+
+    def before_map_value(self, value: pa.Field) -> None:
+        """Override this method to perform an action immediately before 
visiting a map value."""
+
+    def after_map_value(self, value: pa.Field) -> None:
+        """Override this method to perform an action immediately after 
visiting a map value."""
+
+    @abstractmethod
+    def schema(self, schema: pa.Schema, field_results: List[T]) -> Schema:
+        """visit a schema"""
+
+    @abstractmethod
+    def struct(self, struct: pa.StructType, field_results: List[T]) -> T:
+        """visit a struct"""
+
+    @abstractmethod
+    def list(self, list_type: pa.ListType, element_result: T) -> T:
+        """visit a list"""
+
+    @abstractmethod
+    def map(self, map_type: pa.MapType, key_result: T, value_result: T) -> T:
+        """visit a map"""
+
+    @abstractmethod
+    def primitive(self, primitive: pa.DataType) -> T:
+        """visit a primitive type"""
+
+
+def _get_field_id(field: pa.Field) -> int:
+    if field.metadata is not None:
+        field_metadata = {k.decode(): v.decode() for k, v in 
field.metadata.items()}
+        if field_id := field_metadata.get("PARQUET:field_id"):
+            return int(field_id)
+    raise ValueError(f"Field {field.name} does not have a field_id")
+
+
+class _ConvertToIceberg(PyarrowSchemaVisitor[IcebergType], ABC):
+    def schema(self, schema: pa.Schema, field_results: List[IcebergType]) -> 
Schema:
+        fields = []
+        for i in range(len(schema.names)):
+            field = schema.field(i)
+            field_id = _get_field_id(field)
+            field_type = field_results[i]
+            if field_type is not None:
+                fields.append(NestedField(field_id, field.name, field_type, 
required=not field.nullable))
+        return Schema(*fields)
+
+    def struct(self, struct: pa.StructType, field_results: List[IcebergType]) 
-> IcebergType:
+        fields = []
+        for i in range(struct.num_fields):
+            field = struct[i]
+            field_id = _get_field_id(field)
+            # may need to check doc strings

Review Comment:
   I re-consider this and think we do not need to fetch the optinal doc from 
the metadata as the inferred schema is only used to determine the 
correspondance between field name and field id. 



##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -476,6 +483,202 @@ def expression_to_pyarrow(expr: BooleanExpression) -> 
pc.Expression:
     return boolean_expression_visit(expr, _ConvertToArrowExpression())
 
 
+def pyarrow_to_schema(schema: pa.Schema) -> Schema:
+    visitor = _ConvertToIceberg()
+    struct_results = []
+    for i in range(len(schema.names)):
+        field = schema.field(i)
+        visitor.before_field(field)
+        struct_result = visit_pyarrow(field.type, visitor)
+        visitor.after_field(field)
+        struct_results.append(struct_result)
+    return visitor.schema(schema, struct_results)
+
+
+@singledispatch
+def visit_pyarrow(obj: pa.DataType, visitor: PyarrowSchemaVisitor[T]) -> T:
+    """A generic function for applying a pyarrow schema visitor to any point 
within a schema
+
+    The function traverses the schema in post-order fashion
+
+    Args:
+        obj(Schema | IcebergType): An instance of a Schema or an IcebergType
+        visitor (PyarrowSchemaVisitor[T]): An instance of an implementation of 
the generic PyarrowSchemaVisitor base class
+
+    Raises:
+        NotImplementedError: If attempting to visit an unrecognized object type
+    """
+    raise NotImplementedError("Cannot visit non-type: %s" % obj)
+
+
+@visit_pyarrow.register(pa.StructType)
+def _(obj: pa.StructType, visitor: PyarrowSchemaVisitor[T]) -> T:
+    struct_results = []
+    for field in obj:
+        visitor.before_field(field)
+        struct_result = visit_pyarrow(field.type, visitor)
+        visitor.after_field(field)
+        struct_results.append(struct_result)
+
+    return visitor.struct(obj, struct_results)
+
+
+@visit_pyarrow.register(pa.ListType)
+def _(obj: pa.ListType, visitor: PyarrowSchemaVisitor[T]) -> T:
+    visitor.before_list_element(obj.value_field)
+    list_result = visit_pyarrow(obj.value_field.type, visitor)
+    visitor.after_list_element(obj.value_field)
+    return visitor.list(obj, list_result)
+
+
+@visit_pyarrow.register(pa.MapType)
+def _(obj: pa.MapType, visitor: PyarrowSchemaVisitor[T]) -> T:
+    visitor.before_map_key(obj.key_field)
+    key_result = visit_pyarrow(obj.key_field.type, visitor)
+    visitor.after_map_key(obj.key_field)
+    visitor.before_map_value(obj.item_field)
+    value_result = visit_pyarrow(obj.item_field.type, visitor)
+    visitor.after_map_value(obj.item_field)
+    return visitor.map(obj, key_result, value_result)
+
+
+@visit_pyarrow.register(pa.DataType)
+def _(obj: pa.DataType, visitor: PyarrowSchemaVisitor[T]) -> T:
+    if pa.types.is_nested(obj):
+        raise TypeError(f"Expected primitive type, got {type(obj)}")
+    return visitor.primitive(obj)
+
+
+class PyarrowSchemaVisitor(Generic[T], ABC):
+    def before_field(self, field: pa.Field) -> None:
+        """Override this method to perform an action immediately before 
visiting a field."""
+
+    def after_field(self, field: pa.Field) -> None:
+        """Override this method to perform an action immediately after 
visiting a field."""
+
+    def before_list_element(self, element: pa.Field) -> None:
+        """Override this method to perform an action immediately before 
visiting a list element."""
+
+    def after_list_element(self, element: pa.Field) -> None:
+        """Override this method to perform an action immediately after 
visiting a list element."""
+
+    def before_map_key(self, key: pa.Field) -> None:
+        """Override this method to perform an action immediately before 
visiting a map key."""
+
+    def after_map_key(self, key: pa.Field) -> None:
+        """Override this method to perform an action immediately after 
visiting a map key."""
+
+    def before_map_value(self, value: pa.Field) -> None:
+        """Override this method to perform an action immediately before 
visiting a map value."""
+
+    def after_map_value(self, value: pa.Field) -> None:
+        """Override this method to perform an action immediately after 
visiting a map value."""
+
+    @abstractmethod
+    def schema(self, schema: pa.Schema, field_results: List[T]) -> Schema:
+        """visit a schema"""
+
+    @abstractmethod
+    def struct(self, struct: pa.StructType, field_results: List[T]) -> T:
+        """visit a struct"""
+
+    @abstractmethod
+    def list(self, list_type: pa.ListType, element_result: T) -> T:
+        """visit a list"""
+
+    @abstractmethod
+    def map(self, map_type: pa.MapType, key_result: T, value_result: T) -> T:
+        """visit a map"""
+
+    @abstractmethod
+    def primitive(self, primitive: pa.DataType) -> T:
+        """visit a primitive type"""
+
+
+def _get_field_id(field: pa.Field) -> int:
+    if field.metadata is not None:
+        field_metadata = {k.decode(): v.decode() for k, v in 
field.metadata.items()}
+        if field_id := field_metadata.get("PARQUET:field_id"):

Review Comment:
   Just to Confirm: `PARQUET:field_id` is the only key name in the metadata 
representing the field id and we do not need to check other similar names like 
`id`,`field_id`,... 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to