JonasJ-ap commented on code in PR #8144:
URL: https://github.com/apache/iceberg/pull/8144#discussion_r1277005027
##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -541,9 +542,16 @@ def _combine_positional_deletes(positional_deletes:
List[pa.ChunkedArray], rows:
return np.setdiff1d(np.arange(rows), all_chunks, assume_unique=False)
-def pyarrow_to_schema(schema: pa.Schema) -> Schema:
- visitor = _ConvertToIceberg()
- return visit_pyarrow(schema, visitor)
+def pyarrow_to_schema(
+ schema: pa.Schema,
+ projected_schema: Optional[Schema] = None,
+ match_with_field_name: bool = False,
+ ignore_unprojectable_fields: bool = False,
+) -> Schema:
+ visitor = _ConvertToIceberg(projected_schema, match_with_field_name,
ignore_unprojectable_fields)
+ ib_schema = visit_pyarrow(schema, visitor)
+ assert isinstance(ib_schema, StructType)
Review Comment:
Could you please replace it with a `ValueError` or similar things? According
to comments in other PRs, we try to avoid `assert` outside `tests/`
##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -724,6 +832,15 @@ def primitive(self, primitive: pa.DataType) -> IcebergType:
raise TypeError(f"Unsupported type: {primitive}")
+ def __init__(
+ self, projected_schema: Optional[Schema], match_with_field_name: bool
= False, ignore_unprojectable_fields: bool = False
Review Comment:
```suggestion
self, projected_schema: Optional[Schema] = None,
match_with_field_name: bool = False, ignore_unprojectable_fields: bool = False
```
With this change, all test in `test_pyarrow_visitor.py` can pass without any
modification
##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -641,41 +649,125 @@ def primitive(self, primitive: pa.DataType) ->
Optional[T]:
def _get_field_id(field: pa.Field) -> Optional[int]:
- for pyarrow_field_id_key in PYARROW_FIELD_ID_KEYS:
- if field_id_str := field.metadata.get(pyarrow_field_id_key):
- return int(field_id_str.decode())
+ if field.metadata is not None:
+ for pyarrow_field_id_key in PYARROW_FIELD_ID_KEYS:
+ if field_id_str := field.metadata.get(pyarrow_field_id_key):
+ return int(field_id_str.decode())
return None
def _get_field_doc(field: pa.Field) -> Optional[str]:
- for pyarrow_doc_key in PYARROW_FIELD_DOC_KEYS:
- if doc_str := field.metadata.get(pyarrow_doc_key):
- return doc_str.decode()
+ if field.metadata is not None:
+ for pyarrow_doc_key in PYARROW_FIELD_DOC_KEYS:
+ if doc_str := field.metadata.get(pyarrow_doc_key):
+ return doc_str.decode()
return None
-class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]):
+class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType]]):
Review Comment:
```suggestion
class _ConvertToIceberg(PyArrowSchemaVisitor[IcebergType]):
```
I think we no longer need `Union`?
##########
python/tests/io/test_pyarrow.py:
##########
@@ -360,6 +361,65 @@ def test_schema_to_pyarrow_schema(table_schema_nested:
Schema) -> None:
assert repr(actual) == expected
+def test_pyarrow_to_schema(table_schema_simple: Schema, table_schema_nested:
Schema) -> None:
Review Comment:
Do you think it a good idea to move these two tests to
`test_pyarrow_visitor.py` (containing other tests for `pyarrow_to_schema`)? We
currently have too many tests in `test_parrow.py`. I think we may want to stop
adding new tests to it and consider refactoring it into different files.
##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -749,7 +869,16 @@ def _task_to_table(
schema_raw = metadata.get(ICEBERG_SCHEMA)
# TODO: if field_ids are not present, Name Mapping should be
implemented to look them up in the table schema,
Review Comment:
I think this TODO can be removed.
##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -641,41 +649,125 @@ def primitive(self, primitive: pa.DataType) ->
Optional[T]:
def _get_field_id(field: pa.Field) -> Optional[int]:
- for pyarrow_field_id_key in PYARROW_FIELD_ID_KEYS:
- if field_id_str := field.metadata.get(pyarrow_field_id_key):
- return int(field_id_str.decode())
+ if field.metadata is not None:
+ for pyarrow_field_id_key in PYARROW_FIELD_ID_KEYS:
+ if field_id_str := field.metadata.get(pyarrow_field_id_key):
+ return int(field_id_str.decode())
return None
def _get_field_doc(field: pa.Field) -> Optional[str]:
- for pyarrow_doc_key in PYARROW_FIELD_DOC_KEYS:
- if doc_str := field.metadata.get(pyarrow_doc_key):
- return doc_str.decode()
+ if field.metadata is not None:
+ for pyarrow_doc_key in PYARROW_FIELD_DOC_KEYS:
+ if doc_str := field.metadata.get(pyarrow_doc_key):
+ return doc_str.decode()
return None
-class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]):
+class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType]]):
+ projected_schema: Union[Schema, ListType, MapType, None]
+ match_with_field_name: bool
+ ignore_unprojectable_fields: bool
+ projected_schema_stack: List[Tuple[Schema | ListType | MapType | None,
Optional[_Literal["key", "value"]]]]
+ next: Optional[_Literal["key", "value"]]
+
def _convert_fields(self, arrow_fields: Iterable[pa.Field], field_results:
List[Optional[IcebergType]]) -> List[NestedField]:
fields = []
for i, field in enumerate(arrow_fields):
field_id = _get_field_id(field)
field_doc = _get_field_doc(field)
field_type = field_results[i]
- if field_type is not None and field_id is not None:
- fields.append(NestedField(field_id, field.name, field_type,
required=not field.nullable, doc=field_doc))
+ ib_field: Optional[NestedField] = None
+ if field_type is not None:
+ if field_id is not None:
+ ib_field = NestedField(field_id, field.name, field_type,
required=not field.nullable, doc=field_doc)
+ elif self.match_with_field_name:
+ if not isinstance(self.projected_schema, Schema):
+ raise ValueError("projected_schema must be provided if
match_with_field_name is set to True")
+ try:
+ projected_field =
self.projected_schema.find_field(field.name)
+ except ValueError as e:
+ if self.ignore_unprojectable_fields:
+ continue
+ raise ValueError(
+ f"could not find a field that corresponds to
{field.name} in projected schema {self.projected_schema}"
+ ) from e
+ ib_field = NestedField(
+ projected_field.field_id, field.name, field_type,
required=not field.nullable, doc=field_doc
+ )
+ if ib_field is not None:
+ fields.append(ib_field)
return fields
- def schema(self, schema: pa.Schema, field_results:
List[Optional[IcebergType]]) -> Schema:
- return Schema(*self._convert_fields(schema, field_results))
+ def schema(self, schema: pa.Schema, field_results:
List[Optional[IcebergType]]) -> StructType:
+ return StructType(*self._convert_fields(schema, field_results))
+
+ def before_field(self, field: pa.Field) -> None:
+ if not isinstance(field.type, (pa.StructType, pa.ListType,
pa.MapType)):
+ return
+
+ projected_field: Optional[NestedField] = None
+
+ if isinstance(self.projected_schema, Schema):
+ field_id = _get_field_id(field)
+ if field_id is not None:
+ try:
+ projected_field =
self.projected_schema.find_field(field_id)
+ except ValueError:
+ if not self.match_with_field_name:
+ raise
+ if projected_field is None and self.match_with_field_name:
+ projected_field = self.projected_schema.find_field(field.name)
Review Comment:
[Question] Do we need try...except and case on `ingnore_unprojected_fields`
here?
##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -641,41 +649,125 @@ def primitive(self, primitive: pa.DataType) ->
Optional[T]:
def _get_field_id(field: pa.Field) -> Optional[int]:
- for pyarrow_field_id_key in PYARROW_FIELD_ID_KEYS:
- if field_id_str := field.metadata.get(pyarrow_field_id_key):
- return int(field_id_str.decode())
+ if field.metadata is not None:
+ for pyarrow_field_id_key in PYARROW_FIELD_ID_KEYS:
+ if field_id_str := field.metadata.get(pyarrow_field_id_key):
+ return int(field_id_str.decode())
return None
def _get_field_doc(field: pa.Field) -> Optional[str]:
- for pyarrow_doc_key in PYARROW_FIELD_DOC_KEYS:
- if doc_str := field.metadata.get(pyarrow_doc_key):
- return doc_str.decode()
+ if field.metadata is not None:
+ for pyarrow_doc_key in PYARROW_FIELD_DOC_KEYS:
+ if doc_str := field.metadata.get(pyarrow_doc_key):
+ return doc_str.decode()
return None
-class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]):
+class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType]]):
+ projected_schema: Union[Schema, ListType, MapType, None]
+ match_with_field_name: bool
+ ignore_unprojectable_fields: bool
+ projected_schema_stack: List[Tuple[Schema | ListType | MapType | None,
Optional[_Literal["key", "value"]]]]
+ next: Optional[_Literal["key", "value"]]
+
def _convert_fields(self, arrow_fields: Iterable[pa.Field], field_results:
List[Optional[IcebergType]]) -> List[NestedField]:
fields = []
for i, field in enumerate(arrow_fields):
field_id = _get_field_id(field)
field_doc = _get_field_doc(field)
field_type = field_results[i]
- if field_type is not None and field_id is not None:
- fields.append(NestedField(field_id, field.name, field_type,
required=not field.nullable, doc=field_doc))
+ ib_field: Optional[NestedField] = None
+ if field_type is not None:
+ if field_id is not None:
+ ib_field = NestedField(field_id, field.name, field_type,
required=not field.nullable, doc=field_doc)
+ elif self.match_with_field_name:
+ if not isinstance(self.projected_schema, Schema):
+ raise ValueError("projected_schema must be provided if
match_with_field_name is set to True")
+ try:
+ projected_field =
self.projected_schema.find_field(field.name)
+ except ValueError as e:
+ if self.ignore_unprojectable_fields:
+ continue
+ raise ValueError(
+ f"could not find a field that corresponds to
{field.name} in projected schema {self.projected_schema}"
+ ) from e
+ ib_field = NestedField(
+ projected_field.field_id, field.name, field_type,
required=not field.nullable, doc=field_doc
+ )
+ if ib_field is not None:
+ fields.append(ib_field)
return fields
- def schema(self, schema: pa.Schema, field_results:
List[Optional[IcebergType]]) -> Schema:
- return Schema(*self._convert_fields(schema, field_results))
+ def schema(self, schema: pa.Schema, field_results:
List[Optional[IcebergType]]) -> StructType:
+ return StructType(*self._convert_fields(schema, field_results))
+
+ def before_field(self, field: pa.Field) -> None:
+ if not isinstance(field.type, (pa.StructType, pa.ListType,
pa.MapType)):
+ return
+
+ projected_field: Optional[NestedField] = None
+
+ if isinstance(self.projected_schema, Schema):
+ field_id = _get_field_id(field)
+ if field_id is not None:
+ try:
+ projected_field =
self.projected_schema.find_field(field_id)
+ except ValueError:
+ if not self.match_with_field_name:
+ raise
+ if projected_field is None and self.match_with_field_name:
+ projected_field = self.projected_schema.find_field(field.name)
+ elif isinstance(self.projected_schema, ListType):
+ projected_field = self.projected_schema.element_field
+ elif isinstance(self.projected_schema, MapType):
+ if self.next == "key":
+ projected_field = self.projected_schema.key_field
+ elif self.next == "value":
+ projected_field = self.projected_schema.value_field
+ else:
+ raise AssertionError("should never get here")
+
+ self.projected_schema_stack.append((self.projected_schema, self.next))
+ inner_schema: Schema | ListType | MapType | None = None
+ if projected_field is not None:
+ field_type = projected_field.field_type
+ if isinstance(field_type, StructType):
+ inner_schema = Schema(*field_type.fields)
+ else:
+ if isinstance(field_type, (ListType, MapType)):
+ inner_schema = field_type
+
+ self.projected_schema = inner_schema
+ self.next = "key" if isinstance(field.type, pa.MapType) else None
+
+ def after_field(self, field: pa.Field) -> None:
+ if self.next == "key":
+ self.next = "value"
+ elif self.next == "value":
+ self.next = None
+ if not isinstance(field.type, (pa.StructType, pa.ListType,
pa.MapType)):
+ return
+ (self.projected_schema, self.next) = self.projected_schema_stack.pop()
Review Comment:
I think we may want to step forward `self.next` (key -> value, value ->
None) when popping from stack, since in the `after_field` phase, we already
visited the field specified by `self.next` stored on the stack.
This can cause error when we have MapType whose key and value types are
nested:
```
19: mapCol1: optional map<struct<47: innerStruct1: optional struct<49:
random1: optional long, 50: random2:
│ optional long>, 48: innerStruct2: optional
struct<51: random3: optional long, 52: random4: optional long>>,
│ struct<53: innerStruct3: optional struct<55: col1:
optional string, 56: col2: optional string>, 54: col2: optional
│ struct<57: col1: optional string, 58: col2:
optional string>>>
```
In this example, the visitor will use "key"'s projected schema to search for
"value"'s name, result in a
```
ValueError: Could not find field with name innerStruct3, case_sensitive=True
```
##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -749,7 +869,16 @@ def _task_to_table(
schema_raw = metadata.get(ICEBERG_SCHEMA)
# TODO: if field_ids are not present, Name Mapping should be
implemented to look them up in the table schema,
# see https://github.com/apache/iceberg/issues/7451
- file_schema = Schema.parse_raw(schema_raw) if schema_raw is not None
else pyarrow_to_schema(physical_schema)
+ file_schema = (
+ Schema.parse_raw(schema_raw)
+ if schema_raw is not None
+ else pyarrow_to_schema(
+ physical_schema,
+ projected_schema,
Review Comment:
I think we may want full table schema here. `pyarrow_to_schema` supposed to
simply convert the physical schema to the iceberg schema without handling
column pruning. However, `projected_schema` only contains selected columns in a
table scan. If we use it during the conversion, we will have to ignore
unselected columns, which I think is unnecessary and tricky to implement. (also
inconsistent with the behavior when field_id is present)
##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -641,41 +649,125 @@ def primitive(self, primitive: pa.DataType) ->
Optional[T]:
def _get_field_id(field: pa.Field) -> Optional[int]:
- for pyarrow_field_id_key in PYARROW_FIELD_ID_KEYS:
- if field_id_str := field.metadata.get(pyarrow_field_id_key):
- return int(field_id_str.decode())
+ if field.metadata is not None:
+ for pyarrow_field_id_key in PYARROW_FIELD_ID_KEYS:
+ if field_id_str := field.metadata.get(pyarrow_field_id_key):
+ return int(field_id_str.decode())
return None
def _get_field_doc(field: pa.Field) -> Optional[str]:
- for pyarrow_doc_key in PYARROW_FIELD_DOC_KEYS:
- if doc_str := field.metadata.get(pyarrow_doc_key):
- return doc_str.decode()
+ if field.metadata is not None:
+ for pyarrow_doc_key in PYARROW_FIELD_DOC_KEYS:
+ if doc_str := field.metadata.get(pyarrow_doc_key):
+ return doc_str.decode()
return None
-class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]):
+class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType]]):
+ projected_schema: Union[Schema, ListType, MapType, None]
+ match_with_field_name: bool
+ ignore_unprojectable_fields: bool
+ projected_schema_stack: List[Tuple[Schema | ListType | MapType | None,
Optional[_Literal["key", "value"]]]]
+ next: Optional[_Literal["key", "value"]]
+
def _convert_fields(self, arrow_fields: Iterable[pa.Field], field_results:
List[Optional[IcebergType]]) -> List[NestedField]:
fields = []
for i, field in enumerate(arrow_fields):
field_id = _get_field_id(field)
field_doc = _get_field_doc(field)
field_type = field_results[i]
- if field_type is not None and field_id is not None:
- fields.append(NestedField(field_id, field.name, field_type,
required=not field.nullable, doc=field_doc))
+ ib_field: Optional[NestedField] = None
+ if field_type is not None:
+ if field_id is not None:
+ ib_field = NestedField(field_id, field.name, field_type,
required=not field.nullable, doc=field_doc)
+ elif self.match_with_field_name:
+ if not isinstance(self.projected_schema, Schema):
+ raise ValueError("projected_schema must be provided if
match_with_field_name is set to True")
+ try:
+ projected_field =
self.projected_schema.find_field(field.name)
+ except ValueError as e:
+ if self.ignore_unprojectable_fields:
+ continue
+ raise ValueError(
+ f"could not find a field that corresponds to
{field.name} in projected schema {self.projected_schema}"
+ ) from e
+ ib_field = NestedField(
+ projected_field.field_id, field.name, field_type,
required=not field.nullable, doc=field_doc
+ )
+ if ib_field is not None:
+ fields.append(ib_field)
return fields
- def schema(self, schema: pa.Schema, field_results:
List[Optional[IcebergType]]) -> Schema:
- return Schema(*self._convert_fields(schema, field_results))
+ def schema(self, schema: pa.Schema, field_results:
List[Optional[IcebergType]]) -> StructType:
+ return StructType(*self._convert_fields(schema, field_results))
+
+ def before_field(self, field: pa.Field) -> None:
+ if not isinstance(field.type, (pa.StructType, pa.ListType,
pa.MapType)):
+ return
+
+ projected_field: Optional[NestedField] = None
+
+ if isinstance(self.projected_schema, Schema):
+ field_id = _get_field_id(field)
+ if field_id is not None:
+ try:
+ projected_field =
self.projected_schema.find_field(field_id)
+ except ValueError:
+ if not self.match_with_field_name:
+ raise
+ if projected_field is None and self.match_with_field_name:
+ projected_field = self.projected_schema.find_field(field.name)
Review Comment:
For example, I have a table with schema:
```
Schema, id=0
├── 1: name: optional struct<3: firstname: optional
string, 4: middlename: optional string, 5: lastname: optional
│ string>
└── 2: address: optional struct<6: current: optional
struct<8: state: optional string, 9: city: optional string>, 7:
previous: optional struct<10: state: optional
string, 11: city: optional string>>
```
If I only query the `name` column:
```python
catalog.load_table(table_name).scan(selected_fields=("name",
)).to_pandas(match_with_field_name=True, ignore_unprojectable_fields=True)
```
I got the following error raised by the `find_field` here
```
ValueError: Could not find field with name address, case_sensitive=True
```
##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -641,41 +649,125 @@ def primitive(self, primitive: pa.DataType) ->
Optional[T]:
def _get_field_id(field: pa.Field) -> Optional[int]:
- for pyarrow_field_id_key in PYARROW_FIELD_ID_KEYS:
- if field_id_str := field.metadata.get(pyarrow_field_id_key):
- return int(field_id_str.decode())
+ if field.metadata is not None:
+ for pyarrow_field_id_key in PYARROW_FIELD_ID_KEYS:
+ if field_id_str := field.metadata.get(pyarrow_field_id_key):
+ return int(field_id_str.decode())
return None
def _get_field_doc(field: pa.Field) -> Optional[str]:
- for pyarrow_doc_key in PYARROW_FIELD_DOC_KEYS:
- if doc_str := field.metadata.get(pyarrow_doc_key):
- return doc_str.decode()
+ if field.metadata is not None:
+ for pyarrow_doc_key in PYARROW_FIELD_DOC_KEYS:
+ if doc_str := field.metadata.get(pyarrow_doc_key):
+ return doc_str.decode()
return None
-class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]):
+class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType]]):
+ projected_schema: Union[Schema, ListType, MapType, None]
+ match_with_field_name: bool
+ ignore_unprojectable_fields: bool
+ projected_schema_stack: List[Tuple[Schema | ListType | MapType | None,
Optional[_Literal["key", "value"]]]]
+ next: Optional[_Literal["key", "value"]]
+
def _convert_fields(self, arrow_fields: Iterable[pa.Field], field_results:
List[Optional[IcebergType]]) -> List[NestedField]:
fields = []
for i, field in enumerate(arrow_fields):
field_id = _get_field_id(field)
field_doc = _get_field_doc(field)
field_type = field_results[i]
- if field_type is not None and field_id is not None:
- fields.append(NestedField(field_id, field.name, field_type,
required=not field.nullable, doc=field_doc))
+ ib_field: Optional[NestedField] = None
+ if field_type is not None:
+ if field_id is not None:
+ ib_field = NestedField(field_id, field.name, field_type,
required=not field.nullable, doc=field_doc)
+ elif self.match_with_field_name:
+ if not isinstance(self.projected_schema, Schema):
+ raise ValueError("projected_schema must be provided if
match_with_field_name is set to True")
+ try:
+ projected_field =
self.projected_schema.find_field(field.name)
+ except ValueError as e:
+ if self.ignore_unprojectable_fields:
+ continue
+ raise ValueError(
+ f"could not find a field that corresponds to
{field.name} in projected schema {self.projected_schema}"
+ ) from e
+ ib_field = NestedField(
+ projected_field.field_id, field.name, field_type,
required=not field.nullable, doc=field_doc
+ )
+ if ib_field is not None:
+ fields.append(ib_field)
return fields
- def schema(self, schema: pa.Schema, field_results:
List[Optional[IcebergType]]) -> Schema:
- return Schema(*self._convert_fields(schema, field_results))
+ def schema(self, schema: pa.Schema, field_results:
List[Optional[IcebergType]]) -> StructType:
+ return StructType(*self._convert_fields(schema, field_results))
+
+ def before_field(self, field: pa.Field) -> None:
+ if not isinstance(field.type, (pa.StructType, pa.ListType,
pa.MapType)):
+ return
+
+ projected_field: Optional[NestedField] = None
+
+ if isinstance(self.projected_schema, Schema):
+ field_id = _get_field_id(field)
+ if field_id is not None:
+ try:
+ projected_field =
self.projected_schema.find_field(field_id)
+ except ValueError:
+ if not self.match_with_field_name:
+ raise
+ if projected_field is None and self.match_with_field_name:
+ projected_field = self.projected_schema.find_field(field.name)
+ elif isinstance(self.projected_schema, ListType):
+ projected_field = self.projected_schema.element_field
+ elif isinstance(self.projected_schema, MapType):
+ if self.next == "key":
+ projected_field = self.projected_schema.key_field
+ elif self.next == "value":
+ projected_field = self.projected_schema.value_field
+ else:
+ raise AssertionError("should never get here")
+
+ self.projected_schema_stack.append((self.projected_schema, self.next))
+ inner_schema: Schema | ListType | MapType | None = None
+ if projected_field is not None:
+ field_type = projected_field.field_type
+ if isinstance(field_type, StructType):
+ inner_schema = Schema(*field_type.fields)
+ else:
+ if isinstance(field_type, (ListType, MapType)):
+ inner_schema = field_type
+
+ self.projected_schema = inner_schema
+ self.next = "key" if isinstance(field.type, pa.MapType) else None
+
+ def after_field(self, field: pa.Field) -> None:
+ if self.next == "key":
+ self.next = "value"
+ elif self.next == "value":
+ self.next = None
+ if not isinstance(field.type, (pa.StructType, pa.ListType,
pa.MapType)):
+ return
+ (self.projected_schema, self.next) = self.projected_schema_stack.pop()
Review Comment:
I think we may want to step forward `self.next` (key -> value, value ->
None) when popping from stack, since in the `after_field` phase, we already
visited the field specified by `self.next` stored on the stack.
This can cause error when we have MapType whose key and value types are
nested:
```
19: mapCol1: optional map<struct<47: innerStruct1: optional struct<49:
random1: optional long, 50: random2:
│ optional long>, 48: innerStruct2: optional
struct<51: random3: optional long, 52: random4: optional long>>,
│ struct<53: innerStruct3: optional struct<55: col1:
optional string, 56: col2: optional string>, 54: col2: optional
│ struct<57: col1: optional string, 58: col2:
optional string>>>
```
In this example, the visitor will use "key"'s projected schema to search for
"value"'s name, result in a
```
ValueError: Could not find field with name innerStruct3, case_sensitive=True
```
##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -641,41 +649,125 @@ def primitive(self, primitive: pa.DataType) ->
Optional[T]:
def _get_field_id(field: pa.Field) -> Optional[int]:
- for pyarrow_field_id_key in PYARROW_FIELD_ID_KEYS:
- if field_id_str := field.metadata.get(pyarrow_field_id_key):
- return int(field_id_str.decode())
+ if field.metadata is not None:
+ for pyarrow_field_id_key in PYARROW_FIELD_ID_KEYS:
+ if field_id_str := field.metadata.get(pyarrow_field_id_key):
+ return int(field_id_str.decode())
return None
def _get_field_doc(field: pa.Field) -> Optional[str]:
- for pyarrow_doc_key in PYARROW_FIELD_DOC_KEYS:
- if doc_str := field.metadata.get(pyarrow_doc_key):
- return doc_str.decode()
+ if field.metadata is not None:
+ for pyarrow_doc_key in PYARROW_FIELD_DOC_KEYS:
+ if doc_str := field.metadata.get(pyarrow_doc_key):
+ return doc_str.decode()
return None
-class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]):
+class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType]]):
+ projected_schema: Union[Schema, ListType, MapType, None]
+ match_with_field_name: bool
+ ignore_unprojectable_fields: bool
+ projected_schema_stack: List[Tuple[Schema | ListType | MapType | None,
Optional[_Literal["key", "value"]]]]
+ next: Optional[_Literal["key", "value"]]
+
def _convert_fields(self, arrow_fields: Iterable[pa.Field], field_results:
List[Optional[IcebergType]]) -> List[NestedField]:
fields = []
for i, field in enumerate(arrow_fields):
field_id = _get_field_id(field)
field_doc = _get_field_doc(field)
field_type = field_results[i]
- if field_type is not None and field_id is not None:
- fields.append(NestedField(field_id, field.name, field_type,
required=not field.nullable, doc=field_doc))
+ ib_field: Optional[NestedField] = None
+ if field_type is not None:
+ if field_id is not None:
+ ib_field = NestedField(field_id, field.name, field_type,
required=not field.nullable, doc=field_doc)
+ elif self.match_with_field_name:
+ if not isinstance(self.projected_schema, Schema):
+ raise ValueError("projected_schema must be provided if
match_with_field_name is set to True")
+ try:
+ projected_field =
self.projected_schema.find_field(field.name)
+ except ValueError as e:
+ if self.ignore_unprojectable_fields:
+ continue
+ raise ValueError(
+ f"could not find a field that corresponds to
{field.name} in projected schema {self.projected_schema}"
+ ) from e
+ ib_field = NestedField(
+ projected_field.field_id, field.name, field_type,
required=not field.nullable, doc=field_doc
+ )
+ if ib_field is not None:
+ fields.append(ib_field)
return fields
- def schema(self, schema: pa.Schema, field_results:
List[Optional[IcebergType]]) -> Schema:
- return Schema(*self._convert_fields(schema, field_results))
+ def schema(self, schema: pa.Schema, field_results:
List[Optional[IcebergType]]) -> StructType:
+ return StructType(*self._convert_fields(schema, field_results))
+
+ def before_field(self, field: pa.Field) -> None:
+ if not isinstance(field.type, (pa.StructType, pa.ListType,
pa.MapType)):
+ return
+
+ projected_field: Optional[NestedField] = None
+
+ if isinstance(self.projected_schema, Schema):
+ field_id = _get_field_id(field)
+ if field_id is not None:
+ try:
+ projected_field =
self.projected_schema.find_field(field_id)
+ except ValueError:
+ if not self.match_with_field_name:
+ raise
+ if projected_field is None and self.match_with_field_name:
+ projected_field = self.projected_schema.find_field(field.name)
Review Comment:
For example, I have a table with schema:
```
Schema, id=0
├── 1: name: optional struct<3: firstname: optional
string, 4: middlename: optional string, 5: lastname: optional
│ string>
└── 2: address: optional struct<6: current: optional
struct<8: state: optional string, 9: city: optional string>, 7:
previous: optional struct<10: state: optional
string, 11: city: optional string>>
```
If I only query the `name` column:
```python
catalog.load_table(table_name).scan(selected_fields=("name",
)).to_pandas(match_with_field_name=True, ignore_unprojectable_fields=True)
```
I got the following error raised by the `find_field` here
```
ValueError: Could not find field with name address, case_sensitive=True
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]