rdblue commented on code in PR #6506:
URL: https://github.com/apache/iceberg/pull/6506#discussion_r1060129011
##########
python/pyiceberg/avro/resolver.py:
##########
@@ -14,123 +14,250 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-from functools import singledispatch
+# pylint: disable=arguments-renamed,unused-argument
+from functools import partial
from typing import (
+ Callable,
+ Dict,
List,
Optional,
Tuple,
Union,
)
from pyiceberg.avro.reader import (
- ConstructReader,
+ BinaryReader,
+ BooleanReader,
+ DateReader,
+ DecimalReader,
+ DoubleReader,
+ FixedReader,
+ FloatReader,
+ IntegerReader,
ListReader,
MapReader,
NoneReader,
OptionReader,
Reader,
+ StringReader,
+ StructProtocolReader,
StructReader,
+ TimeReader,
+ TimestampReader,
+ TimestamptzReader,
+ UUIDReader,
)
from pyiceberg.exceptions import ResolveError
-from pyiceberg.schema import Schema, promote, visit
+from pyiceberg.schema import (
+ PartnerAccessor,
+ PrimitiveWithPartnerVisitor,
+ Schema,
+ promote,
+ visit_with_partner,
+)
+from pyiceberg.typedef import EMPTY_DICT, StructProtocol
from pyiceberg.types import (
+ BinaryType,
+ BooleanType,
+ DateType,
+ DecimalType,
DoubleType,
+ FixedType,
FloatType,
IcebergType,
+ IntegerType,
ListType,
+ LongType,
MapType,
+ NestedField,
PrimitiveType,
+ StringType,
StructType,
+ TimestampType,
+ TimestamptzType,
+ TimeType,
+ UUIDType,
)
-@singledispatch
-def resolve(file_schema: Union[Schema, IcebergType], read_schema:
Union[Schema, IcebergType]) -> Reader:
- """This resolves the file and read schema
+def construct_reader(file_schema: Union[Schema, IcebergType]) -> Reader:
+ """Constructs a reader from a file schema
+
+ Args:
+ file_schema (Schema | IcebergType): The schema of the Avro file
+
+ Raises:
+ NotImplementedError: If attempting to resolve an unrecognized object
type
+ """
+ return resolve(file_schema, file_schema)
+
- The function traverses the schema in post-order fashion
+def resolve(
+ file_schema: Union[Schema, IcebergType],
+ read_schema: Union[Schema, IcebergType],
+ read_types: Dict[int, Callable[[Schema], StructProtocol]] = EMPTY_DICT,
+) -> Reader:
+ """Resolves the file and read schema to produce a reader
- Args:
- file_schema (Schema | IcebergType): The schema of the Avro file
- read_schema (Schema | IcebergType): The requested read schema which
is equal, subset or superset of the file schema
+ Args:
+ file_schema (Schema | IcebergType): The schema of the Avro file
+ read_schema (Schema | IcebergType): The requested read schema which is
equal, subset or superset of the file schema
+ read_types (Dict[int, Callable[[Schema], StructProtocol]]): A dict of
types to use for struct data
- Raises:
- NotImplementedError: If attempting to resolve an unrecognized object
type
+ Raises:
+ NotImplementedError: If attempting to resolve an unrecognized object
type
"""
- raise NotImplementedError(f"Cannot resolve non-type: {file_schema}")
+ return visit_with_partner(file_schema, read_schema,
SchemaResolver(read_types), SchemaPartnerAccessor()) # type: ignore
+
+
+class SchemaResolver(PrimitiveWithPartnerVisitor[IcebergType, Reader]):
+ read_types: Dict[int, Callable[[Schema], StructProtocol]]
+ field_ids: List[int]
+
+ def before_field(self, field: NestedField, field_partner:
Optional[IcebergType]) -> None:
+ self.field_ids.append(field.field_id)
+
+ def after_field(self, field: NestedField, field_partner:
Optional[IcebergType]) -> None:
+ self.field_ids.pop()
+
+ def create_struct_reader(self, read_schema: StructType, field_readers:
Tuple[Tuple[Optional[int], Reader], ...]) -> Reader:
+ current_field_id = self.field_ids[-1] if self.field_ids else -1
+ if constructor := self.read_types.get(current_field_id):
+ return StructProtocolReader(field_readers, partial(constructor,
read_schema))
+
+ return StructReader(field_readers)
+
+ def __init__(self, read_types: Dict[int, Callable[[Schema],
StructProtocol]]):
+ self.read_types = read_types
+ self.field_ids = []
+
+ def schema(self, schema: Schema, expected_schema: Optional[IcebergType],
result: Reader) -> Reader:
+ return result
+
+ def struct(self, struct: StructType, expected_struct:
Optional[IcebergType], field_readers: List[Reader]) -> Reader:
+ if not expected_struct:
+ # no values are expected so the reader will only be used for
skipping
+ return StructReader(tuple(enumerate(field_readers)))
+
+ if not isinstance(expected_struct, StructType):
+ raise ResolveError(f"File/read schema are not aligned for struct,
got {expected_struct}")
+
+ results: List[Tuple[Optional[int], Reader]] = []
+ expected_positions: Dict[int, int] = {field.field_id: pos for pos,
field in enumerate(expected_struct.fields)}
+
+ # first, add readers for the file fields that must be in order
+ for field, result_reader in zip(struct.fields, field_readers):
Review Comment:
This was mostly to match the previous implementation. I'm good with your
suggestion here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]