rdblue commented on code in PR #6506:
URL: https://github.com/apache/iceberg/pull/6506#discussion_r1060129376
##########
python/pyiceberg/avro/resolver.py:
##########
@@ -14,123 +14,250 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-from functools import singledispatch
+# pylint: disable=arguments-renamed,unused-argument
+from functools import partial
from typing import (
+ Callable,
+ Dict,
List,
Optional,
Tuple,
Union,
)
from pyiceberg.avro.reader import (
- ConstructReader,
+ BinaryReader,
+ BooleanReader,
+ DateReader,
+ DecimalReader,
+ DoubleReader,
+ FixedReader,
+ FloatReader,
+ IntegerReader,
ListReader,
MapReader,
NoneReader,
OptionReader,
Reader,
+ StringReader,
+ StructProtocolReader,
StructReader,
+ TimeReader,
+ TimestampReader,
+ TimestamptzReader,
+ UUIDReader,
)
from pyiceberg.exceptions import ResolveError
-from pyiceberg.schema import Schema, promote, visit
+from pyiceberg.schema import (
+ PartnerAccessor,
+ PrimitiveWithPartnerVisitor,
+ Schema,
+ promote,
+ visit_with_partner,
+)
+from pyiceberg.typedef import EMPTY_DICT, StructProtocol
from pyiceberg.types import (
+ BinaryType,
+ BooleanType,
+ DateType,
+ DecimalType,
DoubleType,
+ FixedType,
FloatType,
IcebergType,
+ IntegerType,
ListType,
+ LongType,
MapType,
+ NestedField,
PrimitiveType,
+ StringType,
StructType,
+ TimestampType,
+ TimestamptzType,
+ TimeType,
+ UUIDType,
)
-@singledispatch
-def resolve(file_schema: Union[Schema, IcebergType], read_schema:
Union[Schema, IcebergType]) -> Reader:
- """This resolves the file and read schema
+def construct_reader(file_schema: Union[Schema, IcebergType]) -> Reader:
+ """Constructs a reader from a file schema
+
+ Args:
+ file_schema (Schema | IcebergType): The schema of the Avro file
+
+ Raises:
+ NotImplementedError: If attempting to resolve an unrecognized object
type
+ """
+ return resolve(file_schema, file_schema)
+
- The function traverses the schema in post-order fashion
+def resolve(
+ file_schema: Union[Schema, IcebergType],
+ read_schema: Union[Schema, IcebergType],
+ read_types: Dict[int, Callable[[Schema], StructProtocol]] = EMPTY_DICT,
+) -> Reader:
+ """Resolves the file and read schema to produce a reader
- Args:
- file_schema (Schema | IcebergType): The schema of the Avro file
- read_schema (Schema | IcebergType): The requested read schema which
is equal, subset or superset of the file schema
+ Args:
+ file_schema (Schema | IcebergType): The schema of the Avro file
+ read_schema (Schema | IcebergType): The requested read schema which is
equal, subset or superset of the file schema
+ read_types (Dict[int, Callable[[Schema], StructProtocol]]): A dict of
types to use for struct data
- Raises:
- NotImplementedError: If attempting to resolve an unrecognized object
type
+ Raises:
+ NotImplementedError: If attempting to resolve an unrecognized object
type
"""
- raise NotImplementedError(f"Cannot resolve non-type: {file_schema}")
+ return visit_with_partner(file_schema, read_schema,
SchemaResolver(read_types), SchemaPartnerAccessor()) # type: ignore
+
+
+class SchemaResolver(PrimitiveWithPartnerVisitor[IcebergType, Reader]):
+ read_types: Dict[int, Callable[[Schema], StructProtocol]]
+ field_ids: List[int]
+
+ def before_field(self, field: NestedField, field_partner:
Optional[IcebergType]) -> None:
+ self.field_ids.append(field.field_id)
+
+ def after_field(self, field: NestedField, field_partner:
Optional[IcebergType]) -> None:
+ self.field_ids.pop()
+
+ def create_struct_reader(self, read_schema: StructType, field_readers:
Tuple[Tuple[Optional[int], Reader], ...]) -> Reader:
+ current_field_id = self.field_ids[-1] if self.field_ids else -1
+ if constructor := self.read_types.get(current_field_id):
Review Comment:
I think the -1 is needed so that we have some way to inject a struct type
for the top-level schema.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]