syun64 commented on code in PR #219:
URL: https://github.com/apache/iceberg-python/pull/219#discussion_r1451023397
##########
pyiceberg/io/pyarrow.py:
##########
@@ -698,77 +708,143 @@ def before_field(self, field: pa.Field) -> None:
def after_field(self, field: pa.Field) -> None:
"""Override this method to perform an action immediately after
visiting a field."""
+ def before_list_element(self, element: pa.Field) -> None:
+ """Override this method to perform an action immediately before
visiting an element within a ListType."""
+
+ def after_list_element(self, element: pa.Field) -> None:
+ """Override this method to perform an action immediately after
visiting an element within a ListType."""
+
+ def before_map_key(self, key: pa.Field) -> None:
+ """Override this method to perform an action immediately before
visiting a key within a MapType."""
+
+ def after_map_key(self, key: pa.Field) -> None:
+ """Override this method to perform an action immediately after
visiting a key within a MapType."""
+
+ def before_map_value(self, value: pa.Field) -> None:
+ """Override this method to perform an action immediately before
visiting a value within a MapType."""
+
+ def after_map_value(self, value: pa.Field) -> None:
+ """Override this method to perform an action immediately after
visiting a value within a MapType."""
+
@abstractmethod
- def schema(self, schema: pa.Schema, field_results: List[Optional[T]]) ->
Optional[T]:
+ def schema(self, schema: pa.Schema, struct_result: T) -> T:
"""Visit a schema."""
@abstractmethod
- def struct(self, struct: pa.StructType, field_results: List[Optional[T]])
-> Optional[T]:
+ def struct(self, struct: pa.StructType, field_results: List[T]) -> T:
"""Visit a struct."""
@abstractmethod
- def list(self, list_type: pa.ListType, element_result: Optional[T]) ->
Optional[T]:
+ def field(self, field: pa.Field, field_result: T) -> T:
+ """Visit a field."""
+
+ @abstractmethod
+ def list(self, list_type: pa.ListType, element_result: T) -> T:
"""Visit a list."""
@abstractmethod
- def map(self, map_type: pa.MapType, key_result: Optional[T], value_result:
Optional[T]) -> Optional[T]:
+ def map(self, map_type: pa.MapType, key_result: T, value_result: T) -> T:
"""Visit a map."""
@abstractmethod
- def primitive(self, primitive: pa.DataType) -> Optional[T]:
+ def primitive(self, primitive: pa.DataType) -> T:
"""Visit a primitive type."""
def _get_field_id(field: pa.Field) -> Optional[int]:
- for pyarrow_field_id_key in PYARROW_FIELD_ID_KEYS:
- if field_id_str := field.metadata.get(pyarrow_field_id_key):
- return int(field_id_str.decode())
+ if field.metadata:
+ for pyarrow_field_id_key in PYARROW_FIELD_ID_KEYS:
+ if field_id_str := field.metadata.get(pyarrow_field_id_key):
+ return int(field_id_str.decode())
return None
def _get_field_doc(field: pa.Field) -> Optional[str]:
- for pyarrow_doc_key in PYARROW_FIELD_DOC_KEYS:
- if doc_str := field.metadata.get(pyarrow_doc_key):
- return doc_str.decode()
+ if field.metadata:
+ for pyarrow_doc_key in PYARROW_FIELD_DOC_KEYS:
+ if doc_str := field.metadata.get(pyarrow_doc_key):
+ return doc_str.decode()
return None
-class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]):
- def _convert_fields(self, arrow_fields: Iterable[pa.Field], field_results:
List[Optional[IcebergType]]) -> List[NestedField]:
- fields = []
- for i, field in enumerate(arrow_fields):
- field_id = _get_field_id(field)
- field_doc = _get_field_doc(field)
- field_type = field_results[i]
- if field_type is not None and field_id is not None:
- fields.append(NestedField(field_id, field.name, field_type,
required=not field.nullable, doc=field_doc))
- return fields
-
- def schema(self, schema: pa.Schema, field_results:
List[Optional[IcebergType]]) -> Schema:
- return Schema(*self._convert_fields(schema, field_results))
-
- def struct(self, struct: pa.StructType, field_results:
List[Optional[IcebergType]]) -> IcebergType:
- return StructType(*self._convert_fields(struct, field_results))
-
- def list(self, list_type: pa.ListType, element_result:
Optional[IcebergType]) -> Optional[IcebergType]:
+class _HasIds(PyArrowSchemaVisitor[bool]):
+ def schema(self, schema: pa.Schema, struct_result: bool) -> bool:
+ return struct_result
+
+ def struct(self, struct: pa.StructType, field_results: List[bool]) -> bool:
+ return all(field_results)
+
+ def field(self, field: pa.Field, field_result: bool) -> bool:
+ return all([_get_field_id(field) is not None, field_result])
+
+ def list(self, list_type: pa.ListType, element_result: bool) -> bool:
element_field = list_type.value_field
element_id = _get_field_id(element_field)
- if element_result is not None and element_id is not None:
- return ListType(element_id, element_result, element_required=not
element_field.nullable)
- return None
+ return element_result and element_id is not None
- def map(
- self, map_type: pa.MapType, key_result: Optional[IcebergType],
value_result: Optional[IcebergType]
- ) -> Optional[IcebergType]:
+ def map(self, map_type: pa.MapType, key_result: bool, value_result: bool)
-> bool:
key_field = map_type.key_field
key_id = _get_field_id(key_field)
value_field = map_type.item_field
value_id = _get_field_id(value_field)
- if key_result is not None and value_result is not None and key_id is
not None and value_id is not None:
- return MapType(key_id, key_result, value_id, value_result,
value_required=not value_field.nullable)
- return None
+ return all([key_id is not None, value_id is not None, key_result,
value_result])
+
+ def primitive(self, primitive: pa.DataType) -> bool:
+ return True
- def primitive(self, primitive: pa.DataType) -> IcebergType:
+
+class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]):
+ """Converts PyArrowSchema to Iceberg Schema. Applies the IDs from
name_mapping if provided."""
+
+ field_names: List[str]
+ name_mapping: Optional[NameMapping]
+
+ def __init__(self, name_mapping: Optional[NameMapping] = None) -> None:
+ self.field_names = []
+ self.name_mapping = name_mapping
+
+ def _current_path(self) -> str:
+ return ".".join(self.field_names)
+
+ def _get_field_id(self, field: pa.Field) -> int:
Review Comment:
Hi @Fokko thank you for your response. I think the use case I'm concerned
with, is similar to the one that @robtandy has brought up in the [open Write
Support
PR](https://github.com/apache/iceberg-python/pull/41/files#r1440972324). If a
user wants to create a new Iceberg table when they have an in memory Arrow
table available. What we are proposing here sounds like:
1. Get the Arrow Schema from the table (this does not have field.metadata or
field_ids)
2. Convert the Arrow Schema to PyIceberg Schema (using pyarrow_to_schema,
which uses _ConvertToIceberg)
3. Assign fresh IDs to the PyIceberg Schema
4. Use the PyIceberg Schema in create_table call
I think right now, (2) would fail, because _ConvertToIceberg requires that
the field_ids or field_metadata be present in the arrow fields, or that we
provide a name_mapping. From my understanding, _ConvertToIceberg is the only
schema conversion class that takes an Arrow schema as the input and outputs a
PyIceberg schema. I think asking the user to provide the name_mapping also
leads to the same problem, because the name mapping generating function also
requires PyIceberg Schema as the input.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]