Fokko commented on code in PR #6997:
URL: https://github.com/apache/iceberg/pull/6997#discussion_r1125699210


##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -15,6 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 # pylint: disable=redefined-outer-name,arguments-renamed

Review Comment:
   Instead of adding another line, you can just append the `W0511` to this
   
   ```suggestion
   # pylint: disable=redefined-outer-name,arguments-renamed,fixme
   ```



##########
python/tests/io/test_pyarrow.py:
##########
@@ -1130,3 +1131,15 @@ def 
test_projection_filter_on_unknown_field(schema_int_str: Schema, file_int_str
         _ = project(schema, [file_int_str], GreaterThan("unknown_field", "1"), 
schema_int_str)
 
     assert "Could not find field with name unknown_field, case_sensitive=True" 
in str(exc_info.value)
+
+
+def test_pyarrow_to_schema_simple(table_schema_simple: Schema, 
pyarrow_schema_simple: pa.Schema) -> None:
+    actual = str(pyarrow_to_schema(pyarrow_schema_simple))
+    expected = str(table_schema_simple)
+    assert actual == expected
+
+
+def test_pyarrow_to_schema_nested(table_schema_nested: Schema, 
pyarrow_schema_nested: pa.Schema) -> None:
+    actual = str(pyarrow_to_schema(pyarrow_schema_nested))

Review Comment:
   Thank you for giving this a try. I did some testing on my end, and adding 
the fields works perfectly fine for Arrow. I've created a table, and read that 
using PyIceberg:
   
   ```sql
   CREATE TABLE default.maps_and_lists(
       some_list ARRAY<string>,
       some_amp  MAP<string, int>
   )
   
   INSERT INTO default.maps_and_lists VALUES(array("a", "b", "c"), map("a", 1, 
"b", 2, "c", 3))
   ```
   Wit the changes that you suggested, I can just read it into a dataframe:
   
   ```python
   def test_vo():
       from pyiceberg.catalog import load_catalog
       cat = load_catalog('local')
       tbl = cat.load_table('default.maps_and_lists')
       df = tbl.scan().to_arrow()
       print(df)
   ```
   
   Which shows:
   
   ```
   some_list: list<item: string>
     child 0, item: string
   some_amp: map<string, int32>
     child 0, entries: struct<key: string not null, value: int32> not null
         child 0, key: string not null
         child 1, value: int32
   ----
   some_list: [[["a","b","c"]]]
   some_amp: [[keys:["a","b","c"]values:[1,2,3]]]
   ```
   I'm in favor of changing this.



##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -476,6 +483,202 @@ def expression_to_pyarrow(expr: BooleanExpression) -> 
pc.Expression:
     return boolean_expression_visit(expr, _ConvertToArrowExpression())
 
 
+def pyarrow_to_schema(schema: pa.Schema) -> Schema:
+    visitor = _ConvertToIceberg()
+    struct_results = []
+    for i in range(len(schema.names)):
+        field = schema.field(i)
+        visitor.before_field(field)
+        struct_result = visit_pyarrow(field.type, visitor)
+        visitor.after_field(field)
+        struct_results.append(struct_result)
+    return visitor.schema(schema, struct_results)
+
+
+@singledispatch
+def visit_pyarrow(obj: pa.DataType, visitor: PyarrowSchemaVisitor[T]) -> T:
+    """A generic function for applying a pyarrow schema visitor to any point 
within a schema
+
+    The function traverses the schema in post-order fashion
+
+    Args:
+        obj(Schema | IcebergType): An instance of a Schema or an IcebergType
+        visitor (PyarrowSchemaVisitor[T]): An instance of an implementation of 
the generic PyarrowSchemaVisitor base class
+
+    Raises:
+        NotImplementedError: If attempting to visit an unrecognized object type
+    """
+    raise NotImplementedError("Cannot visit non-type: %s" % obj)
+
+
+@visit_pyarrow.register(pa.StructType)
+def _(obj: pa.StructType, visitor: PyarrowSchemaVisitor[T]) -> T:
+    struct_results = []
+    for field in obj:
+        visitor.before_field(field)
+        struct_result = visit_pyarrow(field.type, visitor)
+        visitor.after_field(field)
+        struct_results.append(struct_result)
+
+    return visitor.struct(obj, struct_results)
+
+
+@visit_pyarrow.register(pa.ListType)
+def _(obj: pa.ListType, visitor: PyarrowSchemaVisitor[T]) -> T:
+    visitor.before_list_element(obj.value_field)
+    list_result = visit_pyarrow(obj.value_field.type, visitor)
+    visitor.after_list_element(obj.value_field)
+    return visitor.list(obj, list_result)
+
+
+@visit_pyarrow.register(pa.MapType)
+def _(obj: pa.MapType, visitor: PyarrowSchemaVisitor[T]) -> T:
+    visitor.before_map_key(obj.key_field)
+    key_result = visit_pyarrow(obj.key_field.type, visitor)
+    visitor.after_map_key(obj.key_field)
+    visitor.before_map_value(obj.item_field)
+    value_result = visit_pyarrow(obj.item_field.type, visitor)
+    visitor.after_map_value(obj.item_field)
+    return visitor.map(obj, key_result, value_result)
+
+
+@visit_pyarrow.register(pa.DataType)
+def _(obj: pa.DataType, visitor: PyarrowSchemaVisitor[T]) -> T:
+    if pa.types.is_nested(obj):
+        raise TypeError(f"Expected primitive type, got {type(obj)}")
+    return visitor.primitive(obj)
+
+
+class PyarrowSchemaVisitor(Generic[T], ABC):
+    def before_field(self, field: pa.Field) -> None:
+        """Override this method to perform an action immediately before 
visiting a field."""
+
+    def after_field(self, field: pa.Field) -> None:
+        """Override this method to perform an action immediately after 
visiting a field."""
+
+    def before_list_element(self, element: pa.Field) -> None:
+        """Override this method to perform an action immediately before 
visiting a list element."""
+
+    def after_list_element(self, element: pa.Field) -> None:
+        """Override this method to perform an action immediately after 
visiting a list element."""
+
+    def before_map_key(self, key: pa.Field) -> None:
+        """Override this method to perform an action immediately before 
visiting a map key."""
+
+    def after_map_key(self, key: pa.Field) -> None:
+        """Override this method to perform an action immediately after 
visiting a map key."""
+
+    def before_map_value(self, value: pa.Field) -> None:
+        """Override this method to perform an action immediately before 
visiting a map value."""
+
+    def after_map_value(self, value: pa.Field) -> None:
+        """Override this method to perform an action immediately after 
visiting a map value."""
+
+    @abstractmethod
+    def schema(self, schema: pa.Schema, field_results: List[T]) -> Schema:
+        """visit a schema"""
+
+    @abstractmethod
+    def struct(self, struct: pa.StructType, field_results: List[T]) -> T:
+        """visit a struct"""
+
+    @abstractmethod
+    def list(self, list_type: pa.ListType, element_result: T) -> T:
+        """visit a list"""
+
+    @abstractmethod
+    def map(self, map_type: pa.MapType, key_result: T, value_result: T) -> T:
+        """visit a map"""
+
+    @abstractmethod
+    def primitive(self, primitive: pa.DataType) -> T:
+        """visit a primitive type"""
+
+
+def _get_field_id(field: pa.Field) -> int:
+    if field.metadata is not None:
+        field_metadata = {k.decode(): v.decode() for k, v in 
field.metadata.items()}
+        if field_id := field_metadata.get("PARQUET:field_id"):

Review Comment:
   There will be other keys for ORC and Avro. I think we can start with this.



##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -476,6 +483,202 @@ def expression_to_pyarrow(expr: BooleanExpression) -> 
pc.Expression:
     return boolean_expression_visit(expr, _ConvertToArrowExpression())
 
 
+def pyarrow_to_schema(schema: pa.Schema) -> Schema:
+    visitor = _ConvertToIceberg()
+    struct_results = []
+    for i in range(len(schema.names)):
+        field = schema.field(i)
+        visitor.before_field(field)
+        struct_result = visit_pyarrow(field.type, visitor)
+        visitor.after_field(field)
+        struct_results.append(struct_result)
+    return visitor.schema(schema, struct_results)
+
+
+@singledispatch
+def visit_pyarrow(obj: pa.DataType, visitor: PyarrowSchemaVisitor[T]) -> T:
+    """A generic function for applying a pyarrow schema visitor to any point 
within a schema
+
+    The function traverses the schema in post-order fashion
+
+    Args:
+        obj(Schema | IcebergType): An instance of a Schema or an IcebergType

Review Comment:
   ```suggestion
           obj(Schema | pa.DataType): An instance of a Schema or an IcebergType
   ```



##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -492,11 +710,7 @@ def _file_to_table(
         schema_raw = None
         if metadata := parquet_schema.metadata:
             schema_raw = metadata.get(ICEBERG_SCHEMA)
-        if schema_raw is None:
-            raise ValueError(
-                "Iceberg schema is not embedded into the Parquet file, see 
https://github.com/apache/iceberg/issues/6505";
-            )
-        file_schema = Schema.parse_raw(schema_raw)
+        file_schema = Schema.parse_raw(schema_raw) if schema_raw is not None 
else pyarrow_to_schema(parquet_schema)

Review Comment:
   Exactly!



##########
python/tests/io/test_pyarrow.py:
##########
@@ -1130,3 +1131,15 @@ def 
test_projection_filter_on_unknown_field(schema_int_str: Schema, file_int_str
         _ = project(schema, [file_int_str], GreaterThan("unknown_field", "1"), 
schema_int_str)
 
     assert "Could not find field with name unknown_field, case_sensitive=True" 
in str(exc_info.value)
+
+
+def test_pyarrow_to_schema_simple(table_schema_simple: Schema, 
pyarrow_schema_simple: pa.Schema) -> None:
+    actual = str(pyarrow_to_schema(pyarrow_schema_simple))
+    expected = str(table_schema_simple)
+    assert actual == expected
+
+
+def test_pyarrow_to_schema_nested(table_schema_nested: Schema, 
pyarrow_schema_nested: pa.Schema) -> None:
+    actual = str(pyarrow_to_schema(pyarrow_schema_nested))
+    expected = str(table_schema_nested)

Review Comment:
   Personally, I prefer to just turn the expected in the actual string:
   ```python
   def test_pyarrow_to_schema_nested(table_schema_nested: Schema, 
pyarrow_schema_nested: pa.Schema) -> None:
       actual = str(pyarrow_to_schema(pyarrow_schema_nested))
       assert actual == """table {
     1: foo: optional string
     2: bar: required int
     3: baz: optional boolean
     4: qux: required list<string>
     6: quux: required map<string, map<string, int>>
     11: location: required list<struct<13: latitude: optional float, 14: 
longitude: optional float>>
     15: person: optional struct<16: name: optional string, 17: age: required 
int>
   }"""
   
   This way you can see what you're asserting. Gives me a sense of confidence :)
   ```



##########
python/tests/io/test_pyarrow.py:
##########
@@ -1130,3 +1131,15 @@ def 
test_projection_filter_on_unknown_field(schema_int_str: Schema, file_int_str
         _ = project(schema, [file_int_str], GreaterThan("unknown_field", "1"), 
schema_int_str)
 
     assert "Could not find field with name unknown_field, case_sensitive=True" 
in str(exc_info.value)
+
+
+def test_pyarrow_to_schema_simple(table_schema_simple: Schema, 
pyarrow_schema_simple: pa.Schema) -> None:
+    actual = str(pyarrow_to_schema(pyarrow_schema_simple))
+    expected = str(table_schema_simple)
+    assert actual == expected
+
+
+def test_pyarrow_to_schema_nested(table_schema_nested: Schema, 
pyarrow_schema_nested: pa.Schema) -> None:
+    actual = str(pyarrow_to_schema(pyarrow_schema_nested))
+    expected = str(table_schema_nested)

Review Comment:
   I don't think the schema-id is encoded in the Parquet file, but that's okay. 
The validation was done on write time, and on read time, we turn the schema 
into the requested schema. Not having the `identifier_field_ids` is okay, it 
should be provided by the table schema.



##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -476,6 +483,202 @@ def expression_to_pyarrow(expr: BooleanExpression) -> 
pc.Expression:
     return boolean_expression_visit(expr, _ConvertToArrowExpression())
 
 
+def pyarrow_to_schema(schema: pa.Schema) -> Schema:
+    visitor = _ConvertToIceberg()
+    struct_results = []
+    for i in range(len(schema.names)):
+        field = schema.field(i)
+        visitor.before_field(field)
+        struct_result = visit_pyarrow(field.type, visitor)
+        visitor.after_field(field)
+        struct_results.append(struct_result)
+    return visitor.schema(schema, struct_results)
+
+
+@singledispatch
+def visit_pyarrow(obj: pa.DataType, visitor: PyarrowSchemaVisitor[T]) -> T:
+    """A generic function for applying a pyarrow schema visitor to any point 
within a schema
+
+    The function traverses the schema in post-order fashion
+
+    Args:
+        obj(Schema | IcebergType): An instance of a Schema or an IcebergType
+        visitor (PyarrowSchemaVisitor[T]): An instance of an implementation of 
the generic PyarrowSchemaVisitor base class
+
+    Raises:
+        NotImplementedError: If attempting to visit an unrecognized object type
+    """
+    raise NotImplementedError("Cannot visit non-type: %s" % obj)
+
+
+@visit_pyarrow.register(pa.StructType)
+def _(obj: pa.StructType, visitor: PyarrowSchemaVisitor[T]) -> T:
+    struct_results = []
+    for field in obj:
+        visitor.before_field(field)
+        struct_result = visit_pyarrow(field.type, visitor)
+        visitor.after_field(field)
+        struct_results.append(struct_result)
+
+    return visitor.struct(obj, struct_results)
+
+
+@visit_pyarrow.register(pa.ListType)
+def _(obj: pa.ListType, visitor: PyarrowSchemaVisitor[T]) -> T:
+    visitor.before_list_element(obj.value_field)
+    list_result = visit_pyarrow(obj.value_field.type, visitor)
+    visitor.after_list_element(obj.value_field)
+    return visitor.list(obj, list_result)
+
+
+@visit_pyarrow.register(pa.MapType)
+def _(obj: pa.MapType, visitor: PyarrowSchemaVisitor[T]) -> T:
+    visitor.before_map_key(obj.key_field)
+    key_result = visit_pyarrow(obj.key_field.type, visitor)
+    visitor.after_map_key(obj.key_field)
+    visitor.before_map_value(obj.item_field)
+    value_result = visit_pyarrow(obj.item_field.type, visitor)
+    visitor.after_map_value(obj.item_field)
+    return visitor.map(obj, key_result, value_result)
+
+
+@visit_pyarrow.register(pa.DataType)
+def _(obj: pa.DataType, visitor: PyarrowSchemaVisitor[T]) -> T:
+    if pa.types.is_nested(obj):
+        raise TypeError(f"Expected primitive type, got {type(obj)}")
+    return visitor.primitive(obj)
+
+
+class PyarrowSchemaVisitor(Generic[T], ABC):
+    def before_field(self, field: pa.Field) -> None:
+        """Override this method to perform an action immediately before 
visiting a field."""
+
+    def after_field(self, field: pa.Field) -> None:
+        """Override this method to perform an action immediately after 
visiting a field."""
+
+    def before_list_element(self, element: pa.Field) -> None:
+        """Override this method to perform an action immediately before 
visiting a list element."""
+
+    def after_list_element(self, element: pa.Field) -> None:
+        """Override this method to perform an action immediately after 
visiting a list element."""
+
+    def before_map_key(self, key: pa.Field) -> None:
+        """Override this method to perform an action immediately before 
visiting a map key."""
+
+    def after_map_key(self, key: pa.Field) -> None:
+        """Override this method to perform an action immediately after 
visiting a map key."""
+
+    def before_map_value(self, value: pa.Field) -> None:
+        """Override this method to perform an action immediately before 
visiting a map value."""
+
+    def after_map_value(self, value: pa.Field) -> None:
+        """Override this method to perform an action immediately after 
visiting a map value."""
+
+    @abstractmethod
+    def schema(self, schema: pa.Schema, field_results: List[T]) -> Schema:
+        """visit a schema"""
+
+    @abstractmethod
+    def struct(self, struct: pa.StructType, field_results: List[T]) -> T:
+        """visit a struct"""
+
+    @abstractmethod
+    def list(self, list_type: pa.ListType, element_result: T) -> T:
+        """visit a list"""
+
+    @abstractmethod
+    def map(self, map_type: pa.MapType, key_result: T, value_result: T) -> T:
+        """visit a map"""
+
+    @abstractmethod
+    def primitive(self, primitive: pa.DataType) -> T:
+        """visit a primitive type"""
+
+
+def _get_field_id(field: pa.Field) -> int:
+    if field.metadata is not None:
+        field_metadata = {k.decode(): v.decode() for k, v in 
field.metadata.items()}
+        if field_id := field_metadata.get("PARQUET:field_id"):
+            return int(field_id)
+    raise ValueError(f"Field {field.name} does not have a field_id")

Review Comment:
   Nit, we use the following pattern quite a lot in PyIceberg:
   ```suggestion
       raise ValueError(f"Field does not have a field_id: {field.name}")
   ```



##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -476,6 +483,202 @@ def expression_to_pyarrow(expr: BooleanExpression) -> 
pc.Expression:
     return boolean_expression_visit(expr, _ConvertToArrowExpression())
 
 
+def pyarrow_to_schema(schema: pa.Schema) -> Schema:
+    visitor = _ConvertToIceberg()
+    struct_results = []
+    for i in range(len(schema.names)):
+        field = schema.field(i)
+        visitor.before_field(field)
+        struct_result = visit_pyarrow(field.type, visitor)
+        visitor.after_field(field)
+        struct_results.append(struct_result)
+    return visitor.schema(schema, struct_results)
+
+
+@singledispatch
+def visit_pyarrow(obj: pa.DataType, visitor: PyarrowSchemaVisitor[T]) -> T:
+    """A generic function for applying a pyarrow schema visitor to any point 
within a schema
+
+    The function traverses the schema in post-order fashion
+
+    Args:
+        obj(Schema | IcebergType): An instance of a Schema or an IcebergType
+        visitor (PyarrowSchemaVisitor[T]): An instance of an implementation of 
the generic PyarrowSchemaVisitor base class
+
+    Raises:
+        NotImplementedError: If attempting to visit an unrecognized object type
+    """
+    raise NotImplementedError("Cannot visit non-type: %s" % obj)
+
+
+@visit_pyarrow.register(pa.StructType)
+def _(obj: pa.StructType, visitor: PyarrowSchemaVisitor[T]) -> T:
+    struct_results = []
+    for field in obj:
+        visitor.before_field(field)
+        struct_result = visit_pyarrow(field.type, visitor)
+        visitor.after_field(field)
+        struct_results.append(struct_result)
+
+    return visitor.struct(obj, struct_results)
+
+
+@visit_pyarrow.register(pa.ListType)
+def _(obj: pa.ListType, visitor: PyarrowSchemaVisitor[T]) -> T:
+    visitor.before_list_element(obj.value_field)
+    list_result = visit_pyarrow(obj.value_field.type, visitor)
+    visitor.after_list_element(obj.value_field)
+    return visitor.list(obj, list_result)
+
+
+@visit_pyarrow.register(pa.MapType)
+def _(obj: pa.MapType, visitor: PyarrowSchemaVisitor[T]) -> T:
+    visitor.before_map_key(obj.key_field)
+    key_result = visit_pyarrow(obj.key_field.type, visitor)
+    visitor.after_map_key(obj.key_field)
+    visitor.before_map_value(obj.item_field)
+    value_result = visit_pyarrow(obj.item_field.type, visitor)
+    visitor.after_map_value(obj.item_field)
+    return visitor.map(obj, key_result, value_result)
+
+
+@visit_pyarrow.register(pa.DataType)
+def _(obj: pa.DataType, visitor: PyarrowSchemaVisitor[T]) -> T:
+    if pa.types.is_nested(obj):
+        raise TypeError(f"Expected primitive type, got {type(obj)}")
+    return visitor.primitive(obj)
+
+
+class PyarrowSchemaVisitor(Generic[T], ABC):

Review Comment:
   This is the casing that PyArrow also uses.
   ```suggestion
   class PyArrowSchemaVisitor(Generic[T], ABC):
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to