Fokko commented on code in PR #6997: URL: https://github.com/apache/iceberg/pull/6997#discussion_r1125699210
########## python/pyiceberg/io/pyarrow.py: ########## @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. # pylint: disable=redefined-outer-name,arguments-renamed Review Comment: Instead of adding another line, you can just append the `W0511` to this ```suggestion # pylint: disable=redefined-outer-name,arguments-renamed,fixme ``` ########## python/tests/io/test_pyarrow.py: ########## @@ -1130,3 +1131,15 @@ def test_projection_filter_on_unknown_field(schema_int_str: Schema, file_int_str _ = project(schema, [file_int_str], GreaterThan("unknown_field", "1"), schema_int_str) assert "Could not find field with name unknown_field, case_sensitive=True" in str(exc_info.value) + + +def test_pyarrow_to_schema_simple(table_schema_simple: Schema, pyarrow_schema_simple: pa.Schema) -> None: + actual = str(pyarrow_to_schema(pyarrow_schema_simple)) + expected = str(table_schema_simple) + assert actual == expected + + +def test_pyarrow_to_schema_nested(table_schema_nested: Schema, pyarrow_schema_nested: pa.Schema) -> None: + actual = str(pyarrow_to_schema(pyarrow_schema_nested)) Review Comment: Thank you for giving this a try. I did some testing on my end, and adding the fields works perfectly fine for Arrow. I've created a table, and read that using PyIceberg: ```sql CREATE TABLE default.maps_and_lists( some_list ARRAY<string>, some_amp MAP<string, int> ) INSERT INTO default.maps_and_lists VALUES(array("a", "b", "c"), map("a", 1, "b", 2, "c", 3)) ``` Wit the changes that you suggested, I can just read it into a dataframe: ```python def test_vo(): from pyiceberg.catalog import load_catalog cat = load_catalog('local') tbl = cat.load_table('default.maps_and_lists') df = tbl.scan().to_arrow() print(df) ``` Which shows: ``` some_list: list<item: string> child 0, item: string some_amp: map<string, int32> child 0, entries: struct<key: string not null, value: int32> not null child 0, key: string not null child 1, value: int32 ---- some_list: [[["a","b","c"]]] some_amp: [[keys:["a","b","c"]values:[1,2,3]]] ``` I'm in favor of changing this. ########## python/pyiceberg/io/pyarrow.py: ########## @@ -476,6 +483,202 @@ def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression: return boolean_expression_visit(expr, _ConvertToArrowExpression()) +def pyarrow_to_schema(schema: pa.Schema) -> Schema: + visitor = _ConvertToIceberg() + struct_results = [] + for i in range(len(schema.names)): + field = schema.field(i) + visitor.before_field(field) + struct_result = visit_pyarrow(field.type, visitor) + visitor.after_field(field) + struct_results.append(struct_result) + return visitor.schema(schema, struct_results) + + +@singledispatch +def visit_pyarrow(obj: pa.DataType, visitor: PyarrowSchemaVisitor[T]) -> T: + """A generic function for applying a pyarrow schema visitor to any point within a schema + + The function traverses the schema in post-order fashion + + Args: + obj(Schema | IcebergType): An instance of a Schema or an IcebergType + visitor (PyarrowSchemaVisitor[T]): An instance of an implementation of the generic PyarrowSchemaVisitor base class + + Raises: + NotImplementedError: If attempting to visit an unrecognized object type + """ + raise NotImplementedError("Cannot visit non-type: %s" % obj) + + +@visit_pyarrow.register(pa.StructType) +def _(obj: pa.StructType, visitor: PyarrowSchemaVisitor[T]) -> T: + struct_results = [] + for field in obj: + visitor.before_field(field) + struct_result = visit_pyarrow(field.type, visitor) + visitor.after_field(field) + struct_results.append(struct_result) + + return visitor.struct(obj, struct_results) + + +@visit_pyarrow.register(pa.ListType) +def _(obj: pa.ListType, visitor: PyarrowSchemaVisitor[T]) -> T: + visitor.before_list_element(obj.value_field) + list_result = visit_pyarrow(obj.value_field.type, visitor) + visitor.after_list_element(obj.value_field) + return visitor.list(obj, list_result) + + +@visit_pyarrow.register(pa.MapType) +def _(obj: pa.MapType, visitor: PyarrowSchemaVisitor[T]) -> T: + visitor.before_map_key(obj.key_field) + key_result = visit_pyarrow(obj.key_field.type, visitor) + visitor.after_map_key(obj.key_field) + visitor.before_map_value(obj.item_field) + value_result = visit_pyarrow(obj.item_field.type, visitor) + visitor.after_map_value(obj.item_field) + return visitor.map(obj, key_result, value_result) + + +@visit_pyarrow.register(pa.DataType) +def _(obj: pa.DataType, visitor: PyarrowSchemaVisitor[T]) -> T: + if pa.types.is_nested(obj): + raise TypeError(f"Expected primitive type, got {type(obj)}") + return visitor.primitive(obj) + + +class PyarrowSchemaVisitor(Generic[T], ABC): + def before_field(self, field: pa.Field) -> None: + """Override this method to perform an action immediately before visiting a field.""" + + def after_field(self, field: pa.Field) -> None: + """Override this method to perform an action immediately after visiting a field.""" + + def before_list_element(self, element: pa.Field) -> None: + """Override this method to perform an action immediately before visiting a list element.""" + + def after_list_element(self, element: pa.Field) -> None: + """Override this method to perform an action immediately after visiting a list element.""" + + def before_map_key(self, key: pa.Field) -> None: + """Override this method to perform an action immediately before visiting a map key.""" + + def after_map_key(self, key: pa.Field) -> None: + """Override this method to perform an action immediately after visiting a map key.""" + + def before_map_value(self, value: pa.Field) -> None: + """Override this method to perform an action immediately before visiting a map value.""" + + def after_map_value(self, value: pa.Field) -> None: + """Override this method to perform an action immediately after visiting a map value.""" + + @abstractmethod + def schema(self, schema: pa.Schema, field_results: List[T]) -> Schema: + """visit a schema""" + + @abstractmethod + def struct(self, struct: pa.StructType, field_results: List[T]) -> T: + """visit a struct""" + + @abstractmethod + def list(self, list_type: pa.ListType, element_result: T) -> T: + """visit a list""" + + @abstractmethod + def map(self, map_type: pa.MapType, key_result: T, value_result: T) -> T: + """visit a map""" + + @abstractmethod + def primitive(self, primitive: pa.DataType) -> T: + """visit a primitive type""" + + +def _get_field_id(field: pa.Field) -> int: + if field.metadata is not None: + field_metadata = {k.decode(): v.decode() for k, v in field.metadata.items()} + if field_id := field_metadata.get("PARQUET:field_id"): Review Comment: There will be other keys for ORC and Avro. I think we can start with this. ########## python/pyiceberg/io/pyarrow.py: ########## @@ -476,6 +483,202 @@ def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression: return boolean_expression_visit(expr, _ConvertToArrowExpression()) +def pyarrow_to_schema(schema: pa.Schema) -> Schema: + visitor = _ConvertToIceberg() + struct_results = [] + for i in range(len(schema.names)): + field = schema.field(i) + visitor.before_field(field) + struct_result = visit_pyarrow(field.type, visitor) + visitor.after_field(field) + struct_results.append(struct_result) + return visitor.schema(schema, struct_results) + + +@singledispatch +def visit_pyarrow(obj: pa.DataType, visitor: PyarrowSchemaVisitor[T]) -> T: + """A generic function for applying a pyarrow schema visitor to any point within a schema + + The function traverses the schema in post-order fashion + + Args: + obj(Schema | IcebergType): An instance of a Schema or an IcebergType Review Comment: ```suggestion obj(Schema | pa.DataType): An instance of a Schema or an IcebergType ``` ########## python/pyiceberg/io/pyarrow.py: ########## @@ -492,11 +710,7 @@ def _file_to_table( schema_raw = None if metadata := parquet_schema.metadata: schema_raw = metadata.get(ICEBERG_SCHEMA) - if schema_raw is None: - raise ValueError( - "Iceberg schema is not embedded into the Parquet file, see https://github.com/apache/iceberg/issues/6505" - ) - file_schema = Schema.parse_raw(schema_raw) + file_schema = Schema.parse_raw(schema_raw) if schema_raw is not None else pyarrow_to_schema(parquet_schema) Review Comment: Exactly! ########## python/tests/io/test_pyarrow.py: ########## @@ -1130,3 +1131,15 @@ def test_projection_filter_on_unknown_field(schema_int_str: Schema, file_int_str _ = project(schema, [file_int_str], GreaterThan("unknown_field", "1"), schema_int_str) assert "Could not find field with name unknown_field, case_sensitive=True" in str(exc_info.value) + + +def test_pyarrow_to_schema_simple(table_schema_simple: Schema, pyarrow_schema_simple: pa.Schema) -> None: + actual = str(pyarrow_to_schema(pyarrow_schema_simple)) + expected = str(table_schema_simple) + assert actual == expected + + +def test_pyarrow_to_schema_nested(table_schema_nested: Schema, pyarrow_schema_nested: pa.Schema) -> None: + actual = str(pyarrow_to_schema(pyarrow_schema_nested)) + expected = str(table_schema_nested) Review Comment: Personally, I prefer to just turn the expected in the actual string: ```python def test_pyarrow_to_schema_nested(table_schema_nested: Schema, pyarrow_schema_nested: pa.Schema) -> None: actual = str(pyarrow_to_schema(pyarrow_schema_nested)) assert actual == """table { 1: foo: optional string 2: bar: required int 3: baz: optional boolean 4: qux: required list<string> 6: quux: required map<string, map<string, int>> 11: location: required list<struct<13: latitude: optional float, 14: longitude: optional float>> 15: person: optional struct<16: name: optional string, 17: age: required int> }""" This way you can see what you're asserting. Gives me a sense of confidence :) ``` ########## python/tests/io/test_pyarrow.py: ########## @@ -1130,3 +1131,15 @@ def test_projection_filter_on_unknown_field(schema_int_str: Schema, file_int_str _ = project(schema, [file_int_str], GreaterThan("unknown_field", "1"), schema_int_str) assert "Could not find field with name unknown_field, case_sensitive=True" in str(exc_info.value) + + +def test_pyarrow_to_schema_simple(table_schema_simple: Schema, pyarrow_schema_simple: pa.Schema) -> None: + actual = str(pyarrow_to_schema(pyarrow_schema_simple)) + expected = str(table_schema_simple) + assert actual == expected + + +def test_pyarrow_to_schema_nested(table_schema_nested: Schema, pyarrow_schema_nested: pa.Schema) -> None: + actual = str(pyarrow_to_schema(pyarrow_schema_nested)) + expected = str(table_schema_nested) Review Comment: I don't think the schema-id is encoded in the Parquet file, but that's okay. The validation was done on write time, and on read time, we turn the schema into the requested schema. Not having the `identifier_field_ids` is okay, it should be provided by the table schema. ########## python/pyiceberg/io/pyarrow.py: ########## @@ -476,6 +483,202 @@ def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression: return boolean_expression_visit(expr, _ConvertToArrowExpression()) +def pyarrow_to_schema(schema: pa.Schema) -> Schema: + visitor = _ConvertToIceberg() + struct_results = [] + for i in range(len(schema.names)): + field = schema.field(i) + visitor.before_field(field) + struct_result = visit_pyarrow(field.type, visitor) + visitor.after_field(field) + struct_results.append(struct_result) + return visitor.schema(schema, struct_results) + + +@singledispatch +def visit_pyarrow(obj: pa.DataType, visitor: PyarrowSchemaVisitor[T]) -> T: + """A generic function for applying a pyarrow schema visitor to any point within a schema + + The function traverses the schema in post-order fashion + + Args: + obj(Schema | IcebergType): An instance of a Schema or an IcebergType + visitor (PyarrowSchemaVisitor[T]): An instance of an implementation of the generic PyarrowSchemaVisitor base class + + Raises: + NotImplementedError: If attempting to visit an unrecognized object type + """ + raise NotImplementedError("Cannot visit non-type: %s" % obj) + + +@visit_pyarrow.register(pa.StructType) +def _(obj: pa.StructType, visitor: PyarrowSchemaVisitor[T]) -> T: + struct_results = [] + for field in obj: + visitor.before_field(field) + struct_result = visit_pyarrow(field.type, visitor) + visitor.after_field(field) + struct_results.append(struct_result) + + return visitor.struct(obj, struct_results) + + +@visit_pyarrow.register(pa.ListType) +def _(obj: pa.ListType, visitor: PyarrowSchemaVisitor[T]) -> T: + visitor.before_list_element(obj.value_field) + list_result = visit_pyarrow(obj.value_field.type, visitor) + visitor.after_list_element(obj.value_field) + return visitor.list(obj, list_result) + + +@visit_pyarrow.register(pa.MapType) +def _(obj: pa.MapType, visitor: PyarrowSchemaVisitor[T]) -> T: + visitor.before_map_key(obj.key_field) + key_result = visit_pyarrow(obj.key_field.type, visitor) + visitor.after_map_key(obj.key_field) + visitor.before_map_value(obj.item_field) + value_result = visit_pyarrow(obj.item_field.type, visitor) + visitor.after_map_value(obj.item_field) + return visitor.map(obj, key_result, value_result) + + +@visit_pyarrow.register(pa.DataType) +def _(obj: pa.DataType, visitor: PyarrowSchemaVisitor[T]) -> T: + if pa.types.is_nested(obj): + raise TypeError(f"Expected primitive type, got {type(obj)}") + return visitor.primitive(obj) + + +class PyarrowSchemaVisitor(Generic[T], ABC): + def before_field(self, field: pa.Field) -> None: + """Override this method to perform an action immediately before visiting a field.""" + + def after_field(self, field: pa.Field) -> None: + """Override this method to perform an action immediately after visiting a field.""" + + def before_list_element(self, element: pa.Field) -> None: + """Override this method to perform an action immediately before visiting a list element.""" + + def after_list_element(self, element: pa.Field) -> None: + """Override this method to perform an action immediately after visiting a list element.""" + + def before_map_key(self, key: pa.Field) -> None: + """Override this method to perform an action immediately before visiting a map key.""" + + def after_map_key(self, key: pa.Field) -> None: + """Override this method to perform an action immediately after visiting a map key.""" + + def before_map_value(self, value: pa.Field) -> None: + """Override this method to perform an action immediately before visiting a map value.""" + + def after_map_value(self, value: pa.Field) -> None: + """Override this method to perform an action immediately after visiting a map value.""" + + @abstractmethod + def schema(self, schema: pa.Schema, field_results: List[T]) -> Schema: + """visit a schema""" + + @abstractmethod + def struct(self, struct: pa.StructType, field_results: List[T]) -> T: + """visit a struct""" + + @abstractmethod + def list(self, list_type: pa.ListType, element_result: T) -> T: + """visit a list""" + + @abstractmethod + def map(self, map_type: pa.MapType, key_result: T, value_result: T) -> T: + """visit a map""" + + @abstractmethod + def primitive(self, primitive: pa.DataType) -> T: + """visit a primitive type""" + + +def _get_field_id(field: pa.Field) -> int: + if field.metadata is not None: + field_metadata = {k.decode(): v.decode() for k, v in field.metadata.items()} + if field_id := field_metadata.get("PARQUET:field_id"): + return int(field_id) + raise ValueError(f"Field {field.name} does not have a field_id") Review Comment: Nit, we use the following pattern quite a lot in PyIceberg: ```suggestion raise ValueError(f"Field does not have a field_id: {field.name}") ``` ########## python/pyiceberg/io/pyarrow.py: ########## @@ -476,6 +483,202 @@ def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression: return boolean_expression_visit(expr, _ConvertToArrowExpression()) +def pyarrow_to_schema(schema: pa.Schema) -> Schema: + visitor = _ConvertToIceberg() + struct_results = [] + for i in range(len(schema.names)): + field = schema.field(i) + visitor.before_field(field) + struct_result = visit_pyarrow(field.type, visitor) + visitor.after_field(field) + struct_results.append(struct_result) + return visitor.schema(schema, struct_results) + + +@singledispatch +def visit_pyarrow(obj: pa.DataType, visitor: PyarrowSchemaVisitor[T]) -> T: + """A generic function for applying a pyarrow schema visitor to any point within a schema + + The function traverses the schema in post-order fashion + + Args: + obj(Schema | IcebergType): An instance of a Schema or an IcebergType + visitor (PyarrowSchemaVisitor[T]): An instance of an implementation of the generic PyarrowSchemaVisitor base class + + Raises: + NotImplementedError: If attempting to visit an unrecognized object type + """ + raise NotImplementedError("Cannot visit non-type: %s" % obj) + + +@visit_pyarrow.register(pa.StructType) +def _(obj: pa.StructType, visitor: PyarrowSchemaVisitor[T]) -> T: + struct_results = [] + for field in obj: + visitor.before_field(field) + struct_result = visit_pyarrow(field.type, visitor) + visitor.after_field(field) + struct_results.append(struct_result) + + return visitor.struct(obj, struct_results) + + +@visit_pyarrow.register(pa.ListType) +def _(obj: pa.ListType, visitor: PyarrowSchemaVisitor[T]) -> T: + visitor.before_list_element(obj.value_field) + list_result = visit_pyarrow(obj.value_field.type, visitor) + visitor.after_list_element(obj.value_field) + return visitor.list(obj, list_result) + + +@visit_pyarrow.register(pa.MapType) +def _(obj: pa.MapType, visitor: PyarrowSchemaVisitor[T]) -> T: + visitor.before_map_key(obj.key_field) + key_result = visit_pyarrow(obj.key_field.type, visitor) + visitor.after_map_key(obj.key_field) + visitor.before_map_value(obj.item_field) + value_result = visit_pyarrow(obj.item_field.type, visitor) + visitor.after_map_value(obj.item_field) + return visitor.map(obj, key_result, value_result) + + +@visit_pyarrow.register(pa.DataType) +def _(obj: pa.DataType, visitor: PyarrowSchemaVisitor[T]) -> T: + if pa.types.is_nested(obj): + raise TypeError(f"Expected primitive type, got {type(obj)}") + return visitor.primitive(obj) + + +class PyarrowSchemaVisitor(Generic[T], ABC): Review Comment: This is the casing that PyArrow also uses. ```suggestion class PyArrowSchemaVisitor(Generic[T], ABC): ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org