Fokko commented on code in PR #6997:
URL: https://github.com/apache/iceberg/pull/6997#discussion_r1125699210
##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.
# pylint: disable=redefined-outer-name,arguments-renamed
Review Comment:
Instead of adding another line, you can just append the `W0511` to this
```suggestion
# pylint: disable=redefined-outer-name,arguments-renamed,fixme
```
##########
python/tests/io/test_pyarrow.py:
##########
@@ -1130,3 +1131,15 @@ def
test_projection_filter_on_unknown_field(schema_int_str: Schema, file_int_str
_ = project(schema, [file_int_str], GreaterThan("unknown_field", "1"),
schema_int_str)
assert "Could not find field with name unknown_field, case_sensitive=True"
in str(exc_info.value)
+
+
+def test_pyarrow_to_schema_simple(table_schema_simple: Schema,
pyarrow_schema_simple: pa.Schema) -> None:
+ actual = str(pyarrow_to_schema(pyarrow_schema_simple))
+ expected = str(table_schema_simple)
+ assert actual == expected
+
+
+def test_pyarrow_to_schema_nested(table_schema_nested: Schema,
pyarrow_schema_nested: pa.Schema) -> None:
+ actual = str(pyarrow_to_schema(pyarrow_schema_nested))
Review Comment:
Thank you for giving this a try. I did some testing on my end, and adding
the fields works perfectly fine for Arrow. I've created a table, and read that
using PyIceberg:
```sql
CREATE TABLE default.maps_and_lists(
some_list ARRAY<string>,
some_amp MAP<string, int>
)
INSERT INTO default.maps_and_lists VALUES(array("a", "b", "c"), map("a", 1,
"b", 2, "c", 3))
```
Wit the changes that you suggested, I can just read it into a dataframe:
```python
def test_vo():
from pyiceberg.catalog import load_catalog
cat = load_catalog('local')
tbl = cat.load_table('default.maps_and_lists')
df = tbl.scan().to_arrow()
print(df)
```
Which shows:
```
some_list: list<item: string>
child 0, item: string
some_amp: map<string, int32>
child 0, entries: struct<key: string not null, value: int32> not null
child 0, key: string not null
child 1, value: int32
----
some_list: [[["a","b","c"]]]
some_amp: [[keys:["a","b","c"]values:[1,2,3]]]
```
I'm in favor of changing this.
##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -476,6 +483,202 @@ def expression_to_pyarrow(expr: BooleanExpression) ->
pc.Expression:
return boolean_expression_visit(expr, _ConvertToArrowExpression())
+def pyarrow_to_schema(schema: pa.Schema) -> Schema:
+ visitor = _ConvertToIceberg()
+ struct_results = []
+ for i in range(len(schema.names)):
+ field = schema.field(i)
+ visitor.before_field(field)
+ struct_result = visit_pyarrow(field.type, visitor)
+ visitor.after_field(field)
+ struct_results.append(struct_result)
+ return visitor.schema(schema, struct_results)
+
+
+@singledispatch
+def visit_pyarrow(obj: pa.DataType, visitor: PyarrowSchemaVisitor[T]) -> T:
+ """A generic function for applying a pyarrow schema visitor to any point
within a schema
+
+ The function traverses the schema in post-order fashion
+
+ Args:
+ obj(Schema | IcebergType): An instance of a Schema or an IcebergType
+ visitor (PyarrowSchemaVisitor[T]): An instance of an implementation of
the generic PyarrowSchemaVisitor base class
+
+ Raises:
+ NotImplementedError: If attempting to visit an unrecognized object type
+ """
+ raise NotImplementedError("Cannot visit non-type: %s" % obj)
+
+
+@visit_pyarrow.register(pa.StructType)
+def _(obj: pa.StructType, visitor: PyarrowSchemaVisitor[T]) -> T:
+ struct_results = []
+ for field in obj:
+ visitor.before_field(field)
+ struct_result = visit_pyarrow(field.type, visitor)
+ visitor.after_field(field)
+ struct_results.append(struct_result)
+
+ return visitor.struct(obj, struct_results)
+
+
+@visit_pyarrow.register(pa.ListType)
+def _(obj: pa.ListType, visitor: PyarrowSchemaVisitor[T]) -> T:
+ visitor.before_list_element(obj.value_field)
+ list_result = visit_pyarrow(obj.value_field.type, visitor)
+ visitor.after_list_element(obj.value_field)
+ return visitor.list(obj, list_result)
+
+
+@visit_pyarrow.register(pa.MapType)
+def _(obj: pa.MapType, visitor: PyarrowSchemaVisitor[T]) -> T:
+ visitor.before_map_key(obj.key_field)
+ key_result = visit_pyarrow(obj.key_field.type, visitor)
+ visitor.after_map_key(obj.key_field)
+ visitor.before_map_value(obj.item_field)
+ value_result = visit_pyarrow(obj.item_field.type, visitor)
+ visitor.after_map_value(obj.item_field)
+ return visitor.map(obj, key_result, value_result)
+
+
+@visit_pyarrow.register(pa.DataType)
+def _(obj: pa.DataType, visitor: PyarrowSchemaVisitor[T]) -> T:
+ if pa.types.is_nested(obj):
+ raise TypeError(f"Expected primitive type, got {type(obj)}")
+ return visitor.primitive(obj)
+
+
+class PyarrowSchemaVisitor(Generic[T], ABC):
+ def before_field(self, field: pa.Field) -> None:
+ """Override this method to perform an action immediately before
visiting a field."""
+
+ def after_field(self, field: pa.Field) -> None:
+ """Override this method to perform an action immediately after
visiting a field."""
+
+ def before_list_element(self, element: pa.Field) -> None:
+ """Override this method to perform an action immediately before
visiting a list element."""
+
+ def after_list_element(self, element: pa.Field) -> None:
+ """Override this method to perform an action immediately after
visiting a list element."""
+
+ def before_map_key(self, key: pa.Field) -> None:
+ """Override this method to perform an action immediately before
visiting a map key."""
+
+ def after_map_key(self, key: pa.Field) -> None:
+ """Override this method to perform an action immediately after
visiting a map key."""
+
+ def before_map_value(self, value: pa.Field) -> None:
+ """Override this method to perform an action immediately before
visiting a map value."""
+
+ def after_map_value(self, value: pa.Field) -> None:
+ """Override this method to perform an action immediately after
visiting a map value."""
+
+ @abstractmethod
+ def schema(self, schema: pa.Schema, field_results: List[T]) -> Schema:
+ """visit a schema"""
+
+ @abstractmethod
+ def struct(self, struct: pa.StructType, field_results: List[T]) -> T:
+ """visit a struct"""
+
+ @abstractmethod
+ def list(self, list_type: pa.ListType, element_result: T) -> T:
+ """visit a list"""
+
+ @abstractmethod
+ def map(self, map_type: pa.MapType, key_result: T, value_result: T) -> T:
+ """visit a map"""
+
+ @abstractmethod
+ def primitive(self, primitive: pa.DataType) -> T:
+ """visit a primitive type"""
+
+
+def _get_field_id(field: pa.Field) -> int:
+ if field.metadata is not None:
+ field_metadata = {k.decode(): v.decode() for k, v in
field.metadata.items()}
+ if field_id := field_metadata.get("PARQUET:field_id"):
Review Comment:
There will be other keys for ORC and Avro. I think we can start with this.
##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -476,6 +483,202 @@ def expression_to_pyarrow(expr: BooleanExpression) ->
pc.Expression:
return boolean_expression_visit(expr, _ConvertToArrowExpression())
+def pyarrow_to_schema(schema: pa.Schema) -> Schema:
+ visitor = _ConvertToIceberg()
+ struct_results = []
+ for i in range(len(schema.names)):
+ field = schema.field(i)
+ visitor.before_field(field)
+ struct_result = visit_pyarrow(field.type, visitor)
+ visitor.after_field(field)
+ struct_results.append(struct_result)
+ return visitor.schema(schema, struct_results)
+
+
+@singledispatch
+def visit_pyarrow(obj: pa.DataType, visitor: PyarrowSchemaVisitor[T]) -> T:
+ """A generic function for applying a pyarrow schema visitor to any point
within a schema
+
+ The function traverses the schema in post-order fashion
+
+ Args:
+ obj(Schema | IcebergType): An instance of a Schema or an IcebergType
Review Comment:
```suggestion
obj(Schema | pa.DataType): An instance of a Schema or an IcebergType
```
##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -492,11 +710,7 @@ def _file_to_table(
schema_raw = None
if metadata := parquet_schema.metadata:
schema_raw = metadata.get(ICEBERG_SCHEMA)
- if schema_raw is None:
- raise ValueError(
- "Iceberg schema is not embedded into the Parquet file, see
https://github.com/apache/iceberg/issues/6505"
- )
- file_schema = Schema.parse_raw(schema_raw)
+ file_schema = Schema.parse_raw(schema_raw) if schema_raw is not None
else pyarrow_to_schema(parquet_schema)
Review Comment:
Exactly!
##########
python/tests/io/test_pyarrow.py:
##########
@@ -1130,3 +1131,15 @@ def
test_projection_filter_on_unknown_field(schema_int_str: Schema, file_int_str
_ = project(schema, [file_int_str], GreaterThan("unknown_field", "1"),
schema_int_str)
assert "Could not find field with name unknown_field, case_sensitive=True"
in str(exc_info.value)
+
+
+def test_pyarrow_to_schema_simple(table_schema_simple: Schema,
pyarrow_schema_simple: pa.Schema) -> None:
+ actual = str(pyarrow_to_schema(pyarrow_schema_simple))
+ expected = str(table_schema_simple)
+ assert actual == expected
+
+
+def test_pyarrow_to_schema_nested(table_schema_nested: Schema,
pyarrow_schema_nested: pa.Schema) -> None:
+ actual = str(pyarrow_to_schema(pyarrow_schema_nested))
+ expected = str(table_schema_nested)
Review Comment:
Personally, I prefer to just turn the expected in the actual string:
```python
def test_pyarrow_to_schema_nested(table_schema_nested: Schema,
pyarrow_schema_nested: pa.Schema) -> None:
actual = str(pyarrow_to_schema(pyarrow_schema_nested))
assert actual == """table {
1: foo: optional string
2: bar: required int
3: baz: optional boolean
4: qux: required list<string>
6: quux: required map<string, map<string, int>>
11: location: required list<struct<13: latitude: optional float, 14:
longitude: optional float>>
15: person: optional struct<16: name: optional string, 17: age: required
int>
}"""
This way you can see what you're asserting. Gives me a sense of confidence :)
```
##########
python/tests/io/test_pyarrow.py:
##########
@@ -1130,3 +1131,15 @@ def
test_projection_filter_on_unknown_field(schema_int_str: Schema, file_int_str
_ = project(schema, [file_int_str], GreaterThan("unknown_field", "1"),
schema_int_str)
assert "Could not find field with name unknown_field, case_sensitive=True"
in str(exc_info.value)
+
+
+def test_pyarrow_to_schema_simple(table_schema_simple: Schema,
pyarrow_schema_simple: pa.Schema) -> None:
+ actual = str(pyarrow_to_schema(pyarrow_schema_simple))
+ expected = str(table_schema_simple)
+ assert actual == expected
+
+
+def test_pyarrow_to_schema_nested(table_schema_nested: Schema,
pyarrow_schema_nested: pa.Schema) -> None:
+ actual = str(pyarrow_to_schema(pyarrow_schema_nested))
+ expected = str(table_schema_nested)
Review Comment:
I don't think the schema-id is encoded in the Parquet file, but that's okay.
The validation was done on write time, and on read time, we turn the schema
into the requested schema. Not having the `identifier_field_ids` is okay, it
should be provided by the table schema.
##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -476,6 +483,202 @@ def expression_to_pyarrow(expr: BooleanExpression) ->
pc.Expression:
return boolean_expression_visit(expr, _ConvertToArrowExpression())
+def pyarrow_to_schema(schema: pa.Schema) -> Schema:
+ visitor = _ConvertToIceberg()
+ struct_results = []
+ for i in range(len(schema.names)):
+ field = schema.field(i)
+ visitor.before_field(field)
+ struct_result = visit_pyarrow(field.type, visitor)
+ visitor.after_field(field)
+ struct_results.append(struct_result)
+ return visitor.schema(schema, struct_results)
+
+
+@singledispatch
+def visit_pyarrow(obj: pa.DataType, visitor: PyarrowSchemaVisitor[T]) -> T:
+ """A generic function for applying a pyarrow schema visitor to any point
within a schema
+
+ The function traverses the schema in post-order fashion
+
+ Args:
+ obj(Schema | IcebergType): An instance of a Schema or an IcebergType
+ visitor (PyarrowSchemaVisitor[T]): An instance of an implementation of
the generic PyarrowSchemaVisitor base class
+
+ Raises:
+ NotImplementedError: If attempting to visit an unrecognized object type
+ """
+ raise NotImplementedError("Cannot visit non-type: %s" % obj)
+
+
+@visit_pyarrow.register(pa.StructType)
+def _(obj: pa.StructType, visitor: PyarrowSchemaVisitor[T]) -> T:
+ struct_results = []
+ for field in obj:
+ visitor.before_field(field)
+ struct_result = visit_pyarrow(field.type, visitor)
+ visitor.after_field(field)
+ struct_results.append(struct_result)
+
+ return visitor.struct(obj, struct_results)
+
+
+@visit_pyarrow.register(pa.ListType)
+def _(obj: pa.ListType, visitor: PyarrowSchemaVisitor[T]) -> T:
+ visitor.before_list_element(obj.value_field)
+ list_result = visit_pyarrow(obj.value_field.type, visitor)
+ visitor.after_list_element(obj.value_field)
+ return visitor.list(obj, list_result)
+
+
+@visit_pyarrow.register(pa.MapType)
+def _(obj: pa.MapType, visitor: PyarrowSchemaVisitor[T]) -> T:
+ visitor.before_map_key(obj.key_field)
+ key_result = visit_pyarrow(obj.key_field.type, visitor)
+ visitor.after_map_key(obj.key_field)
+ visitor.before_map_value(obj.item_field)
+ value_result = visit_pyarrow(obj.item_field.type, visitor)
+ visitor.after_map_value(obj.item_field)
+ return visitor.map(obj, key_result, value_result)
+
+
+@visit_pyarrow.register(pa.DataType)
+def _(obj: pa.DataType, visitor: PyarrowSchemaVisitor[T]) -> T:
+ if pa.types.is_nested(obj):
+ raise TypeError(f"Expected primitive type, got {type(obj)}")
+ return visitor.primitive(obj)
+
+
+class PyarrowSchemaVisitor(Generic[T], ABC):
+ def before_field(self, field: pa.Field) -> None:
+ """Override this method to perform an action immediately before
visiting a field."""
+
+ def after_field(self, field: pa.Field) -> None:
+ """Override this method to perform an action immediately after
visiting a field."""
+
+ def before_list_element(self, element: pa.Field) -> None:
+ """Override this method to perform an action immediately before
visiting a list element."""
+
+ def after_list_element(self, element: pa.Field) -> None:
+ """Override this method to perform an action immediately after
visiting a list element."""
+
+ def before_map_key(self, key: pa.Field) -> None:
+ """Override this method to perform an action immediately before
visiting a map key."""
+
+ def after_map_key(self, key: pa.Field) -> None:
+ """Override this method to perform an action immediately after
visiting a map key."""
+
+ def before_map_value(self, value: pa.Field) -> None:
+ """Override this method to perform an action immediately before
visiting a map value."""
+
+ def after_map_value(self, value: pa.Field) -> None:
+ """Override this method to perform an action immediately after
visiting a map value."""
+
+ @abstractmethod
+ def schema(self, schema: pa.Schema, field_results: List[T]) -> Schema:
+ """visit a schema"""
+
+ @abstractmethod
+ def struct(self, struct: pa.StructType, field_results: List[T]) -> T:
+ """visit a struct"""
+
+ @abstractmethod
+ def list(self, list_type: pa.ListType, element_result: T) -> T:
+ """visit a list"""
+
+ @abstractmethod
+ def map(self, map_type: pa.MapType, key_result: T, value_result: T) -> T:
+ """visit a map"""
+
+ @abstractmethod
+ def primitive(self, primitive: pa.DataType) -> T:
+ """visit a primitive type"""
+
+
+def _get_field_id(field: pa.Field) -> int:
+ if field.metadata is not None:
+ field_metadata = {k.decode(): v.decode() for k, v in
field.metadata.items()}
+ if field_id := field_metadata.get("PARQUET:field_id"):
+ return int(field_id)
+ raise ValueError(f"Field {field.name} does not have a field_id")
Review Comment:
Nit, we use the following pattern quite a lot in PyIceberg:
```suggestion
raise ValueError(f"Field does not have a field_id: {field.name}")
```
##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -476,6 +483,202 @@ def expression_to_pyarrow(expr: BooleanExpression) ->
pc.Expression:
return boolean_expression_visit(expr, _ConvertToArrowExpression())
+def pyarrow_to_schema(schema: pa.Schema) -> Schema:
+ visitor = _ConvertToIceberg()
+ struct_results = []
+ for i in range(len(schema.names)):
+ field = schema.field(i)
+ visitor.before_field(field)
+ struct_result = visit_pyarrow(field.type, visitor)
+ visitor.after_field(field)
+ struct_results.append(struct_result)
+ return visitor.schema(schema, struct_results)
+
+
+@singledispatch
+def visit_pyarrow(obj: pa.DataType, visitor: PyarrowSchemaVisitor[T]) -> T:
+ """A generic function for applying a pyarrow schema visitor to any point
within a schema
+
+ The function traverses the schema in post-order fashion
+
+ Args:
+ obj(Schema | IcebergType): An instance of a Schema or an IcebergType
+ visitor (PyarrowSchemaVisitor[T]): An instance of an implementation of
the generic PyarrowSchemaVisitor base class
+
+ Raises:
+ NotImplementedError: If attempting to visit an unrecognized object type
+ """
+ raise NotImplementedError("Cannot visit non-type: %s" % obj)
+
+
+@visit_pyarrow.register(pa.StructType)
+def _(obj: pa.StructType, visitor: PyarrowSchemaVisitor[T]) -> T:
+ struct_results = []
+ for field in obj:
+ visitor.before_field(field)
+ struct_result = visit_pyarrow(field.type, visitor)
+ visitor.after_field(field)
+ struct_results.append(struct_result)
+
+ return visitor.struct(obj, struct_results)
+
+
+@visit_pyarrow.register(pa.ListType)
+def _(obj: pa.ListType, visitor: PyarrowSchemaVisitor[T]) -> T:
+ visitor.before_list_element(obj.value_field)
+ list_result = visit_pyarrow(obj.value_field.type, visitor)
+ visitor.after_list_element(obj.value_field)
+ return visitor.list(obj, list_result)
+
+
+@visit_pyarrow.register(pa.MapType)
+def _(obj: pa.MapType, visitor: PyarrowSchemaVisitor[T]) -> T:
+ visitor.before_map_key(obj.key_field)
+ key_result = visit_pyarrow(obj.key_field.type, visitor)
+ visitor.after_map_key(obj.key_field)
+ visitor.before_map_value(obj.item_field)
+ value_result = visit_pyarrow(obj.item_field.type, visitor)
+ visitor.after_map_value(obj.item_field)
+ return visitor.map(obj, key_result, value_result)
+
+
+@visit_pyarrow.register(pa.DataType)
+def _(obj: pa.DataType, visitor: PyarrowSchemaVisitor[T]) -> T:
+ if pa.types.is_nested(obj):
+ raise TypeError(f"Expected primitive type, got {type(obj)}")
+ return visitor.primitive(obj)
+
+
+class PyarrowSchemaVisitor(Generic[T], ABC):
Review Comment:
This is the casing that PyArrow also uses.
```suggestion
class PyArrowSchemaVisitor(Generic[T], ABC):
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]