Fokko commented on code in PR #6525:
URL: https://github.com/apache/iceberg/pull/6525#discussion_r1062349688


##########
python/pyiceberg/avro/reader.py:
##########
@@ -238,26 +249,25 @@ def skip(self, decoder: BinaryDecoder) -> None:
             return self.option.skip(decoder)
 
 
-class StructProtocolReader(Reader):
-    create_struct: Callable[[], StructProtocol]
+class StructReader(Reader):
     fields: Tuple[Tuple[Optional[int], Reader], ...]
+    create_struct: Type[StructProtocol]
 
-    def __init__(self, fields: Tuple[Tuple[Optional[int], Reader], ...], 
create_struct: Callable[[], StructProtocol]):
-        self.create_struct = create_struct
+    def __init__(self, fields: Tuple[Tuple[Optional[int], Reader], ...], 
create_struct: Type[StructProtocol] = Record):
         self.fields = fields
-
-    def create_or_reuse(self, reuse: Optional[StructProtocol]) -> 
StructProtocol:
-        if reuse:
-            return reuse
-        else:
-            return self.create_struct()
+        self.create_struct = create_struct or Record
 
     def read(self, decoder: BinaryDecoder) -> Any:
-        struct = self.create_or_reuse(None)
+        if issubclass(self.create_struct, Record):
+            struct = Record.of(len(self.fields))

Review Comment:
   This is only used for the `partition` struct in the ManifestEntry.



##########
python/pyiceberg/avro/reader.py:
##########
@@ -238,26 +249,25 @@ def skip(self, decoder: BinaryDecoder) -> None:
             return self.option.skip(decoder)
 
 
-class StructProtocolReader(Reader):
-    create_struct: Callable[[], StructProtocol]
+class StructReader(Reader):

Review Comment:
   I removed the `StructProtocolReader` from the hierarchy. Not sure why we 
need it.



##########
python/pyiceberg/manifest.py:
##########
@@ -76,19 +76,100 @@ def __repr__(self) -> str:
         return f"FileFormat.{self.name}"
 
 
-class DataFile(IcebergBaseModel):
+DATA_FILE = StructType(
+    NestedField(
+        field_id=134,
+        name="content",
+        field_type=IntegerType(),
+        required=False,
+        doc="Contents of the file: 0=data, 1=position deletes, 2=equality 
deletes",
+    ),
+    NestedField(field_id=100, name="file_path", field_type=StringType(), 
required=True, doc="Location URI with FS scheme"),
+    NestedField(
+        field_id=101, name="file_format", field_type=StringType(), 
required=True, doc="File format name: avro, orc, or parquet"
+    ),
+    NestedField(
+        field_id=102,
+        name="partition",
+        field_type=StructType(),
+        required=True,
+        doc="Partition data tuple, schema based on the partition spec",
+    ),
+    NestedField(field_id=103, name="record_count", field_type=LongType(), 
required=True, doc="Number of records in the file"),
+    NestedField(field_id=104, name="file_size_in_bytes", 
field_type=LongType(), required=True, doc="Total file size in bytes"),
+    NestedField(
+        field_id=108,
+        name="column_sizes",
+        field_type=MapType(key_id=117, key_type=IntegerType(), value_id=118, 
value_type=LongType()),
+        required=True,
+        doc="Map of column id to total size on disk",
+    ),
+    NestedField(
+        field_id=109,
+        name="value_counts",
+        field_type=MapType(key_id=119, key_type=IntegerType(), value_id=120, 
value_type=LongType()),
+        required=True,
+        doc="Map of column id to total count, including null and NaN",
+    ),
+    NestedField(
+        field_id=110,
+        name="null_value_counts",
+        field_type=MapType(key_id=121, key_type=IntegerType(), value_id=122, 
value_type=LongType()),
+        required=False,
+        doc="Map of column id to null value count",
+    ),
+    NestedField(
+        field_id=137,
+        name="nan_value_counts",
+        field_type=MapType(key_id=138, key_type=IntegerType(), value_id=139, 
value_type=LongType()),
+        required=False,
+        doc="Map of column id to number of NaN values in the column",
+    ),
+    NestedField(
+        field_id=125,
+        name="lower_bounds",
+        field_type=MapType(key_id=126, key_type=IntegerType(), value_id=127, 
value_type=BinaryType()),
+        required=False,
+        doc="Map of column id to lower bound",
+    ),
+    NestedField(
+        field_id=128,
+        name="upper_bounds",
+        field_type=MapType(key_id=129, key_type=IntegerType(), value_id=130, 
value_type=BinaryType()),
+        required=False,
+        doc="Map of column id to upper bound",
+    ),
+    NestedField(field_id=131, name="key_metadata", field_type=BinaryType(), 
required=False, doc="Encryption key metadata blob"),
+    NestedField(
+        field_id=132,
+        name="split_offsets",
+        field_type=ListType(element_id=133, element_type=LongType(), 
element_required=True),
+        required=False,
+        doc="Splittable offsets",
+    ),
+    NestedField(
+        field_id=135,
+        name="equality_ids",
+        field_type=ListType(element_id=136, element_type=LongType(), 
element_required=True),
+        required=False,
+        doc="Equality comparison field IDs",
+    ),
+    NestedField(field_id=140, name="sort_order_id", field_type=IntegerType(), 
required=False, doc="Sort order ID"),
+    NestedField(field_id=141, name="spec_id", field_type=IntegerType(), 
required=False, doc="Partition spec ID"),
+)
+
+
+class DataFile(PydanticStruct):
     content: DataFileContent = Field(default=DataFileContent.DATA)
     file_path: str = Field()
     file_format: FileFormat = Field()
-    partition: Dict[str, Any] = Field()
+    partition: Record = Field()

Review Comment:
   
![giphy](https://user-images.githubusercontent.com/1134248/210768222-ede794ff-4dfb-4d28-968b-6fcf9e197eb2.gif)
   



##########
python/pyiceberg/avro/reader.py:
##########
@@ -238,26 +249,25 @@ def skip(self, decoder: BinaryDecoder) -> None:
             return self.option.skip(decoder)
 
 
-class StructProtocolReader(Reader):
-    create_struct: Callable[[], StructProtocol]
+class StructReader(Reader):
     fields: Tuple[Tuple[Optional[int], Reader], ...]
+    create_struct: Type[StructProtocol]
 
-    def __init__(self, fields: Tuple[Tuple[Optional[int], Reader], ...], 
create_struct: Callable[[], StructProtocol]):
-        self.create_struct = create_struct
+    def __init__(self, fields: Tuple[Tuple[Optional[int], Reader], ...], 
create_struct: Type[StructProtocol] = Record):
         self.fields = fields
-
-    def create_or_reuse(self, reuse: Optional[StructProtocol]) -> 
StructProtocol:

Review Comment:
   I left out the reuse for now. This conflicted with the default values. For 
example, the DataFile has `DataFileContent.DATA` as the default value for 
`content`. If we want to re-use we also need to re-set the defaults (this is 
quite easy since we can just loop over the Pydantic fields).



##########
python/pyiceberg/avro/resolver.py:
##########
@@ -109,38 +109,46 @@ def resolve(
 
 
 class SchemaResolver(PrimitiveWithPartnerVisitor[IcebergType, Reader]):
-    read_types: Optional[Dict[int, Callable[[Schema], StructProtocol]]]
+    read_types: Dict[int, Type[StructProtocol]]
+    context: List[int]
 
-    def __init__(self, read_types: Optional[Dict[int, Callable[[Schema], 
StructProtocol]]]):
+    def __init__(self, read_types: Dict[int, Type[StructProtocol]] = 
EMPTY_DICT) -> None:
         self.read_types = read_types
+        self.context = []
 
     def schema(self, schema: Schema, expected_schema: Optional[IcebergType], 
result: Reader) -> Reader:
         return result
 
+    def before_field(self, field: NestedField, field_partner: 
Optional[NestedField]) -> None:
+        self.context.append(field.field_id)
+
+    def after_field(self, field: NestedField, field_partner: 
Optional[NestedField]) -> None:
+        self.context.pop()
+
     def struct(self, struct: StructType, expected_struct: 
Optional[IcebergType], field_readers: List[Reader]) -> Reader:
+        # -1 indicates the struct root
+        read_struct_id = self.context[-1] if len(self.context) > 0 else -1
+        struct_callable = self.read_types.get(read_struct_id, Record)
+
         if not expected_struct:
-            return StructReader(tuple(enumerate(field_readers)))
+            return StructReader(tuple(enumerate(field_readers)), 
struct_callable)
 
         if not isinstance(expected_struct, StructType):
             raise ResolveError(f"File/read schema are not aligned for struct, 
got {expected_struct}")
 
-        results: List[Tuple[Optional[int], Reader]] = []
         expected_positions: Dict[int, int] = {field.field_id: pos for pos, 
field in enumerate(expected_struct.fields)}
 
         # first, add readers for the file fields that must be in order
-        for field, result_reader in zip(struct.fields, field_readers):
-            read_pos = expected_positions.get(field.field_id)
-            results.append((read_pos, result_reader))
+        results: List[Tuple[Optional[int], Reader]] = [
+            (expected_positions.get(field.field_id), result_reader) for field, 
result_reader in zip(struct.fields, field_readers)
+        ]
 
         file_fields = {field.field_id: field for field in struct.fields}
-        for pos, read_field in enumerate(expected_struct.fields):
-            if read_field.field_id not in file_fields:
-                if read_field.required:
-                    raise ResolveError(f"{read_field} is non-optional, and not 
part of the file schema")
-                # Just set the new field to None
-                results.append((pos, NoneReader()))

Review Comment:
   I had to leave those out since they override the default values (that are 
set when loading a v1 table). If we want to re-use we could change this to 
`DefaultOrNoneReader()` that will prioritize the fields' default value over 
just writing `None`



##########
python/pyiceberg/typedef.py:
##########
@@ -72,34 +71,9 @@ class StructProtocol(Protocol):  # pragma: no cover
     """A generic protocol used by accessors to get and set at positions of an 
object"""
 
     @abstractmethod
-    def get(self, pos: int) -> Any:
+    def __getitem__(self, pos: int) -> Any:
         ...
 
     @abstractmethod
-    def set(self, pos: int, value: Any) -> None:
+    def __setitem__(self, pos: int, value: Any) -> None:
         ...
-
-
-class Record(StructProtocol):

Review Comment:
   I moved these to `iceberg_base_model.py` because of circular references. I'm 
also completely fine with merging `iceberg_base_model.py` into `typedef.py`. 
Also removes one file.



##########
python/tests/table/test_snapshots.py:
##########
@@ -121,40 +119,3 @@ def 
test_snapshot_with_properties_repr(snapshot_with_properties: Snapshot) -> No
         == """Snapshot(snapshot_id=25, parent_snapshot_id=19, 
sequence_number=200, timestamp_ms=1602638573590, 
manifest_list='s3:/a/b/c.avro', summary=Summary(Operation.APPEND, **{'foo': 
'bar'}), schema_id=3)"""
     )
     assert snapshot_with_properties == eval(repr(snapshot_with_properties))
-
-
-def test_fetch_manifest_list(generated_manifest_file_file: str) -> None:

Review Comment:
   Redundant test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to