smaheshwar-pltr commented on code in PR #3220:
URL: https://github.com/apache/iceberg-python/pull/3220#discussion_r3106472611
##########
pyiceberg/schema.py:
##########
@@ -1380,6 +1380,68 @@ def primitive(self, primitive: PrimitiveType) ->
PrimitiveType:
return primitive
+class _SetFreshIDsForReplace(_SetFreshIDs):
+ """Assign fresh IDs for a replace operation, reusing IDs from the base
schema by field name.
+
+ For each field in the new schema, if a field with the same full name
exists in the
+ base schema, its ID is reused; otherwise a fresh ID is allocated starting
from
+ last_column_id + 1.
+ """
+
+ def __init__(self, old_id_to_base_id: dict[int, int], starting_id: int) ->
None:
+ self.old_id_to_new_id: dict[int, int] = {}
+ self._old_id_to_base_id = old_id_to_base_id
+ counter = itertools.count(starting_id + 1)
+ self.next_id_func = lambda: next(counter)
+
+ def _get_and_increment(self, current_id: int) -> int:
+ if current_id in self._old_id_to_base_id:
+ new_id = self._old_id_to_base_id[current_id]
+ else:
+ new_id = self.next_id_func()
+ self.old_id_to_new_id[current_id] = new_id
+ return new_id
+
+
+def assign_fresh_schema_ids_for_replace(schema: Schema, all_schemas:
list[Schema], last_column_id: int) -> tuple[Schema, int]:
+ """Assign fresh IDs to a schema for a replace operation, reusing IDs from
existing schemas.
+
+ For each field in the new schema, if a field with the same full path name
exists
+ in any of the existing schemas, its ID is reused. New fields get IDs
starting from
+ last_column_id + 1.
+
+ Args:
+ schema: The new schema to assign IDs to.
+ all_schemas: All schemas from the existing table metadata (IDs are
reused from here by name).
+ last_column_id: The current table's last_column_id (new IDs start
above this).
+
+ Returns:
+ A tuple of (fresh_schema, new_last_column_id).
+ """
+ # N.B. We diverge from the Java implementation by using ALL historical
schemas for
+ # field ID reuse, not just the current schema. Java's
TypeUtil.assignFreshIds only
+ # uses the current schema as the base, so a replace A→B→A where A and B
have
+ # disjoint fields would create a 3rd schema (field IDs from A are lost
when B is
+ # current). By using all schemas, we can recover those IDs and correctly
deduplicate
+ # the schema on the way back. This is safe because Iceberg guarantees field
+ # name-to-ID consistency across schema evolution.
Review Comment:
Reverted — now uses only the current schema for field ID reuse (matching
Java's `TypeUtil.assignFreshIds`). Removed the divergence comment entirely. The
A→B→A test now expects 3 schemas, which matches Java behavior.
##########
pyiceberg/table/__init__.py:
##########
@@ -1009,6 +1010,131 @@ def commit_transaction(self) -> Table:
return self._table
+class ReplaceTableTransaction(Transaction):
+ """A transaction that replaces an existing table's schema, spec, sort
order, location, and properties.
+
+ The existing table UUID, snapshots, snapshot log, metadata log, and
history are preserved.
+ The "main" branch ref is removed (current-snapshot-id set to -1), and new
+ schema/spec/sort-order/location/properties are applied.
+ """
+
+ def _initial_changes(
+ self,
+ table_metadata: TableMetadata,
+ new_schema: Schema,
+ new_spec: PartitionSpec,
+ new_sort_order: SortOrder,
+ new_location: str,
+ new_properties: Properties,
+ ) -> None:
+ """Set the initial changes that transform the existing table into the
replacement."""
+ # Remove the main branch ref to clear the current snapshot
+ self._updates += (RemoveSnapshotRefUpdate(ref_name=MAIN_BRANCH),)
+
+ # Set the new schema, reusing an existing schema if structurally
identical.
+ # Schema.__eq__ compares fields and identifier_field_ids (ignoring
schema_id),
+ # matching Java's sameSchema() behavior.
Review Comment:
Done.
##########
pyiceberg/table/__init__.py:
##########
@@ -1009,6 +1010,131 @@ def commit_transaction(self) -> Table:
return self._table
+class ReplaceTableTransaction(Transaction):
+ """A transaction that replaces an existing table's schema, spec, sort
order, location, and properties.
+
+ The existing table UUID, snapshots, snapshot log, metadata log, and
history are preserved.
+ The "main" branch ref is removed (current-snapshot-id set to -1), and new
+ schema/spec/sort-order/location/properties are applied.
+ """
+
+ def _initial_changes(
+ self,
+ table_metadata: TableMetadata,
+ new_schema: Schema,
+ new_spec: PartitionSpec,
+ new_sort_order: SortOrder,
+ new_location: str,
+ new_properties: Properties,
+ ) -> None:
+ """Set the initial changes that transform the existing table into the
replacement."""
+ # Remove the main branch ref to clear the current snapshot
+ self._updates += (RemoveSnapshotRefUpdate(ref_name=MAIN_BRANCH),)
+
+ # Set the new schema, reusing an existing schema if structurally
identical.
+ # Schema.__eq__ compares fields and identifier_field_ids (ignoring
schema_id),
+ # matching Java's sameSchema() behavior.
+ existing_schema_id = self._find_matching_schema_id(table_metadata,
new_schema)
+ if existing_schema_id is not None:
+ if existing_schema_id != table_metadata.current_schema_id:
+ self._updates +=
(SetCurrentSchemaUpdate(schema_id=existing_schema_id),)
+ else:
+ self._updates += (
+ AddSchemaUpdate(schema_=new_schema),
+ SetCurrentSchemaUpdate(schema_id=-1),
+ )
+
+ # Set the new partition spec.
+ # Only emit AddPartitionSpecUpdate + SetDefaultSpecUpdate(-1) when the
spec
+ # is genuinely new. If an identical spec already exists, use its
concrete ID
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]