HonahX commented on code in PR #245:
URL: https://github.com/apache/iceberg-python/pull/245#discussion_r1469034571
##########
pyiceberg/table/__init__.py:
##########
@@ -533,6 +551,39 @@ def _(update: SetCurrentSchemaUpdate, base_metadata:
TableMetadata, context: _Ta
return base_metadata.model_copy(update={"current_schema_id":
new_schema_id})
+@_apply_table_update.register(AddPartitionSpecUpdate)
+def _(update: AddPartitionSpecUpdate, base_metadata: TableMetadata, context:
_TableMetadataUpdateContext) -> TableMetadata:
+ for spec in base_metadata.partition_specs:
+ if spec.spec_id == update.spec_id:
+ raise ValueError(f"Partition spec with id {spec.spec_id} already
exists: {spec}")
+
+ context.add_update(update)
+ return base_metadata.model_copy(
+ update={
+ "partition_specs": base_metadata.partition_specs + [update.spec],
+ }
+ )
+
+
+@_apply_table_update.register(SetDefaultSpecUpdate)
+def _(update: SetDefaultSpecUpdate, base_metadata: TableMetadata, context:
_TableMetadataUpdateContext) -> TableMetadata:
+ new_spec_id = update.spec_id
+ if new_spec_id == base_metadata.default_spec_id:
Review Comment:
Shall we add the support for `-1` spec_id which represents the last spec
added in this transaction?
https://github.com/apache/iceberg/blob/6852278d6cf01bec2998be954d7bd6f6cc37cc94/open-api/rest-catalog-open-api.yaml#L2242
##########
pyiceberg/table/__init__.py:
##########
@@ -2271,3 +2325,240 @@ def commit(self) -> Snapshot:
)
return snapshot
+
+
+class UpdateSpec:
+ _table: Table
+ _name_to_field: Dict[str, PartitionField] = {}
+ _name_to_added_field: Dict[str, PartitionField] = {}
+ _transform_to_field: Dict[Tuple[int, str], PartitionField] = {}
+ _transform_to_added_field: Dict[Tuple[int, str], PartitionField] = {}
+ _renames: Dict[str, str] = {}
+ _added_time_fields: Dict[int, PartitionField] = {}
+ _case_sensitive: bool
+ _adds: List[PartitionField]
+ _deletes: Set[int]
+ _last_assigned_partition_id: int
+ _transaction: Optional[Transaction]
+ _unassigned_field_name = 'unassigned_field_name'
Review Comment:
Thanks for the analysis! I agree that we should keep the validation. I was
thinking a little bit of refactoring may make things look better. I've left a
comment at the place where we provide the name for the new field. Please let me
know what you think.
##########
pyiceberg/table/__init__.py:
##########
@@ -2271,3 +2325,243 @@ def commit(self) -> Snapshot:
)
return snapshot
+
+
+class UpdateSpec:
+ _table: Table
+ _name_to_field: Dict[str, PartitionField] = {}
+ _name_to_added_field: Dict[str, PartitionField] = {}
+ _transform_to_field: Dict[Tuple[int, str], PartitionField] = {}
+ _transform_to_added_field: Dict[Tuple[int, str], PartitionField] = {}
+ _renames: Dict[str, str] = {}
+ _added_time_fields: Dict[int, PartitionField] = {}
+ _case_sensitive: bool
+ _adds: List[PartitionField]
+ _deletes: Set[int]
+ _last_assigned_partition_id: int
+ _transaction: Optional[Transaction]
+ _unassigned_field_name = 'unassigned_field_name'
+
+ def __init__(self, table: Table, transaction: Optional[Transaction] =
None, case_sensitive: bool = True) -> None:
+ self._table = table
+ self._name_to_field = {field.name: field for field in
table.spec().fields}
+ self._name_to_added_field = {}
+ self._transform_to_field = {(field.source_id, repr(field.transform)):
field for field in table.spec().fields}
+ self._transform_to_added_field = {}
+ self._adds = []
+ self._deletes = set()
+ if len(table.specs()) == 1:
+ self._last_assigned_partition_id = PARTITION_FIELD_ID_START - 1
+ else:
+ self._last_assigned_partition_id =
table.spec().last_assigned_field_id
+ self._renames = {}
+ self._transaction = transaction
+ self._case_sensitive = case_sensitive
+ self._added_time_fields = {}
+
+ def add_field(
+ self,
+ source_column_name: str,
+ transform: Transform[Any, Any],
+ partition_field_name: Optional[str] = _unassigned_field_name,
+ ) -> UpdateSpec:
+ ref = Reference(source_column_name)
+ bound_ref = ref.bind(self._table.schema(), self._case_sensitive)
+ # verify transform can actually bind it
+ output_type = bound_ref.field.field_type
+ if not transform.can_transform(output_type):
+ raise ValueError(f"{transform} cannot transform {output_type}
values from {bound_ref.field.name}")
+
+ transform_key = (bound_ref.field.field_id, repr(transform))
+ existing_partition_field = self._transform_to_field.get(transform_key)
+ if existing_partition_field and
self._is_duplicate_partition(transform, existing_partition_field):
+ raise ValueError(f"Duplicate partition field for
${ref.name}=${ref}, ${existing_partition_field} already exists")
+
+ added = self._transform_to_added_field.get(transform_key)
+ if added:
+ raise ValueError(f"Already added partition: {added.name}")
+
+ new_field = self._partition_field((bound_ref.field.field_id,
transform), partition_field_name)
+ if new_field.name == self._unassigned_field_name:
Review Comment:
How about moving the unassigned case inside `_partition_field`. This way we
only need to do a None-check inside the `_partition_field` and make
'unassigned_field_name' a temp name for a temp PartitionField.
```python
def _partition_field(self, transform_key: Tuple[int, Transform[Any, Any]],
name: Optional[str]) -> PartitionField:
...
if name is None:
tmp_field = PartitionField(transform_key[0], self._new_field_id(),
transform_key[1], "unassigned_field_name")
name = visit_partition_field(self._table.schema(), tmp_field,
_PartitionNameGenerator())
return PartitionField(transform_key[0], self._new_field_id(),
transform_key[1], name)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]