rdblue commented on code in PR #6506:
URL: https://github.com/apache/iceberg/pull/6506#discussion_r1060129376


##########
python/pyiceberg/avro/resolver.py:
##########
@@ -14,123 +14,250 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-from functools import singledispatch
+# pylint: disable=arguments-renamed,unused-argument
+from functools import partial
 from typing import (
+    Callable,
+    Dict,
     List,
     Optional,
     Tuple,
     Union,
 )
 
 from pyiceberg.avro.reader import (
-    ConstructReader,
+    BinaryReader,
+    BooleanReader,
+    DateReader,
+    DecimalReader,
+    DoubleReader,
+    FixedReader,
+    FloatReader,
+    IntegerReader,
     ListReader,
     MapReader,
     NoneReader,
     OptionReader,
     Reader,
+    StringReader,
+    StructProtocolReader,
     StructReader,
+    TimeReader,
+    TimestampReader,
+    TimestamptzReader,
+    UUIDReader,
 )
 from pyiceberg.exceptions import ResolveError
-from pyiceberg.schema import Schema, promote, visit
+from pyiceberg.schema import (
+    PartnerAccessor,
+    PrimitiveWithPartnerVisitor,
+    Schema,
+    promote,
+    visit_with_partner,
+)
+from pyiceberg.typedef import EMPTY_DICT, StructProtocol
 from pyiceberg.types import (
+    BinaryType,
+    BooleanType,
+    DateType,
+    DecimalType,
     DoubleType,
+    FixedType,
     FloatType,
     IcebergType,
+    IntegerType,
     ListType,
+    LongType,
     MapType,
+    NestedField,
     PrimitiveType,
+    StringType,
     StructType,
+    TimestampType,
+    TimestamptzType,
+    TimeType,
+    UUIDType,
 )
 
 
-@singledispatch
-def resolve(file_schema: Union[Schema, IcebergType], read_schema: 
Union[Schema, IcebergType]) -> Reader:
-    """This resolves the file and read schema
+def construct_reader(file_schema: Union[Schema, IcebergType]) -> Reader:
+    """Constructs a reader from a file schema
+
+    Args:
+        file_schema (Schema | IcebergType): The schema of the Avro file
+
+    Raises:
+        NotImplementedError: If attempting to resolve an unrecognized object 
type
+    """
+    return resolve(file_schema, file_schema)
+
 
-    The function traverses the schema in post-order fashion
+def resolve(
+    file_schema: Union[Schema, IcebergType],
+    read_schema: Union[Schema, IcebergType],
+    read_types: Dict[int, Callable[[Schema], StructProtocol]] = EMPTY_DICT,
+) -> Reader:
+    """Resolves the file and read schema to produce a reader
 
-     Args:
-         file_schema (Schema | IcebergType): The schema of the Avro file
-         read_schema (Schema | IcebergType): The requested read schema which 
is equal, subset or superset of the file schema
+    Args:
+        file_schema (Schema | IcebergType): The schema of the Avro file
+        read_schema (Schema | IcebergType): The requested read schema which is 
equal, subset or superset of the file schema
+        read_types (Dict[int, Callable[[Schema], StructProtocol]]): A dict of 
types to use for struct data
 
-     Raises:
-         NotImplementedError: If attempting to resolve an unrecognized object 
type
+    Raises:
+        NotImplementedError: If attempting to resolve an unrecognized object 
type
     """
-    raise NotImplementedError(f"Cannot resolve non-type: {file_schema}")
+    return visit_with_partner(file_schema, read_schema, 
SchemaResolver(read_types), SchemaPartnerAccessor())  # type: ignore
+
+
+class SchemaResolver(PrimitiveWithPartnerVisitor[IcebergType, Reader]):
+    read_types: Dict[int, Callable[[Schema], StructProtocol]]
+    field_ids: List[int]
+
+    def before_field(self, field: NestedField, field_partner: 
Optional[IcebergType]) -> None:
+        self.field_ids.append(field.field_id)
+
+    def after_field(self, field: NestedField, field_partner: 
Optional[IcebergType]) -> None:
+        self.field_ids.pop()
+
+    def create_struct_reader(self, read_schema: StructType, field_readers: 
Tuple[Tuple[Optional[int], Reader], ...]) -> Reader:
+        current_field_id = self.field_ids[-1] if self.field_ids else -1
+        if constructor := self.read_types.get(current_field_id):

Review Comment:
   I think the -1 is needed so that we have some way to inject a struct type 
for the top-level schema.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to