smaheshwar-pltr commented on code in PR #3220:
URL: https://github.com/apache/iceberg-python/pull/3220#discussion_r3106467004
##########
pyiceberg/schema.py:
##########
@@ -1380,6 +1380,68 @@ def primitive(self, primitive: PrimitiveType) ->
PrimitiveType:
return primitive
+class _SetFreshIDsForReplace(_SetFreshIDs):
+ """Assign fresh IDs for a replace operation, reusing IDs from the base
schema by field name.
+
+ For each field in the new schema, if a field with the same full name
exists in the
+ base schema, its ID is reused; otherwise a fresh ID is allocated starting
from
+ last_column_id + 1.
+ """
+
+ def __init__(self, old_id_to_base_id: dict[int, int], starting_id: int) ->
None:
+ self.old_id_to_new_id: dict[int, int] = {}
+ self._old_id_to_base_id = old_id_to_base_id
+ counter = itertools.count(starting_id + 1)
+ self.next_id_func = lambda: next(counter)
+
+ def _get_and_increment(self, current_id: int) -> int:
+ if current_id in self._old_id_to_base_id:
+ new_id = self._old_id_to_base_id[current_id]
+ else:
+ new_id = self.next_id_func()
+ self.old_id_to_new_id[current_id] = new_id
+ return new_id
+
+
+def assign_fresh_schema_ids_for_replace(schema: Schema, all_schemas:
list[Schema], last_column_id: int) -> tuple[Schema, int]:
+ """Assign fresh IDs to a schema for a replace operation, reusing IDs from
existing schemas.
+
+ For each field in the new schema, if a field with the same full path name
exists
+ in any of the existing schemas, its ID is reused. New fields get IDs
starting from
+ last_column_id + 1.
+
+ Args:
+ schema: The new schema to assign IDs to.
+ all_schemas: All schemas from the existing table metadata (IDs are
reused from here by name).
+ last_column_id: The current table's last_column_id (new IDs start
above this).
+
+ Returns:
+ A tuple of (fresh_schema, new_last_column_id).
+ """
+ # N.B. We diverge from the Java implementation by using ALL historical
schemas for
+ # field ID reuse, not just the current schema. Java's
TypeUtil.assignFreshIds only
+ # uses the current schema as the base, so a replace A→B→A where A and B
have
+ # disjoint fields would create a 3rd schema (field IDs from A are lost
when B is
+ # current). By using all schemas, we can recover those IDs and correctly
deduplicate
+ # the schema on the way back. This is safe because Iceberg guarantees field
+ # name-to-ID consistency across schema evolution.
Review Comment:
Too wordy. and can we not say safe because this is what union by name does
as you said? does it search for historical schemas? are we really sure it's
safe? (I'm not too sure)
##########
pyiceberg/schema.py:
##########
@@ -1380,6 +1380,68 @@ def primitive(self, primitive: PrimitiveType) ->
PrimitiveType:
return primitive
+class _SetFreshIDsForReplace(_SetFreshIDs):
+ """Assign fresh IDs for a replace operation, reusing IDs from the base
schema by field name.
+
+ For each field in the new schema, if a field with the same full name
exists in the
+ base schema, its ID is reused; otherwise a fresh ID is allocated starting
from
+ last_column_id + 1.
+ """
+
+ def __init__(self, old_id_to_base_id: dict[int, int], starting_id: int) ->
None:
+ self.old_id_to_new_id: dict[int, int] = {}
+ self._old_id_to_base_id = old_id_to_base_id
+ counter = itertools.count(starting_id + 1)
+ self.next_id_func = lambda: next(counter)
+
+ def _get_and_increment(self, current_id: int) -> int:
+ if current_id in self._old_id_to_base_id:
+ new_id = self._old_id_to_base_id[current_id]
+ else:
+ new_id = self.next_id_func()
+ self.old_id_to_new_id[current_id] = new_id
+ return new_id
+
+
+def assign_fresh_schema_ids_for_replace(schema: Schema, all_schemas:
list[Schema], last_column_id: int) -> tuple[Schema, int]:
+ """Assign fresh IDs to a schema for a replace operation, reusing IDs from
existing schemas.
+
+ For each field in the new schema, if a field with the same full path name
exists
+ in any of the existing schemas, its ID is reused. New fields get IDs
starting from
+ last_column_id + 1.
+
+ Args:
+ schema: The new schema to assign IDs to.
+ all_schemas: All schemas from the existing table metadata (IDs are
reused from here by name).
+ last_column_id: The current table's last_column_id (new IDs start
above this).
+
+ Returns:
+ A tuple of (fresh_schema, new_last_column_id).
+ """
+ # N.B. We diverge from the Java implementation by using ALL historical
schemas for
+ # field ID reuse, not just the current schema. Java's
TypeUtil.assignFreshIds only
+ # uses the current schema as the base, so a replace A→B→A where A and B
have
+ # disjoint fields would create a 3rd schema (field IDs from A are lost
when B is
+ # current). By using all schemas, we can recover those IDs and correctly
deduplicate
+ # the schema on the way back. This is safe because Iceberg guarantees field
+ # name-to-ID consistency across schema evolution.
Review Comment:
if we're not 100% sure, we could just be safe and not diverge? we can then
comment on the PR and ask for community's thoughts
##########
pyiceberg/schema.py:
##########
@@ -1380,6 +1380,68 @@ def primitive(self, primitive: PrimitiveType) ->
PrimitiveType:
return primitive
+class _SetFreshIDsForReplace(_SetFreshIDs):
+ """Assign fresh IDs for a replace operation, reusing IDs from the base
schema by field name.
+
+ For each field in the new schema, if a field with the same full name
exists in the
+ base schema, its ID is reused; otherwise a fresh ID is allocated starting
from
+ last_column_id + 1.
+ """
+
+ def __init__(self, old_id_to_base_id: dict[int, int], starting_id: int) ->
None:
+ self.old_id_to_new_id: dict[int, int] = {}
+ self._old_id_to_base_id = old_id_to_base_id
+ counter = itertools.count(starting_id + 1)
+ self.next_id_func = lambda: next(counter)
+
+ def _get_and_increment(self, current_id: int) -> int:
+ if current_id in self._old_id_to_base_id:
+ new_id = self._old_id_to_base_id[current_id]
+ else:
+ new_id = self.next_id_func()
+ self.old_id_to_new_id[current_id] = new_id
+ return new_id
+
+
+def assign_fresh_schema_ids_for_replace(schema: Schema, all_schemas:
list[Schema], last_column_id: int) -> tuple[Schema, int]:
+ """Assign fresh IDs to a schema for a replace operation, reusing IDs from
existing schemas.
+
+ For each field in the new schema, if a field with the same full path name
exists
+ in any of the existing schemas, its ID is reused. New fields get IDs
starting from
+ last_column_id + 1.
+
+ Args:
+ schema: The new schema to assign IDs to.
+ all_schemas: All schemas from the existing table metadata (IDs are
reused from here by name).
+ last_column_id: The current table's last_column_id (new IDs start
above this).
+
+ Returns:
+ A tuple of (fresh_schema, new_last_column_id).
+ """
+ # N.B. We diverge from the Java implementation by using ALL historical
schemas for
+ # field ID reuse, not just the current schema. Java's
TypeUtil.assignFreshIds only
+ # uses the current schema as the base, so a replace A→B→A where A and B
have
+ # disjoint fields would create a 3rd schema (field IDs from A are lost
when B is
+ # current). By using all schemas, we can recover those IDs and correctly
deduplicate
+ # the schema on the way back. This is safe because Iceberg guarantees field
+ # name-to-ID consistency across schema evolution.
Review Comment:
Reverted — now uses only the current schema for field ID reuse (matching
Java's `TypeUtil.assignFreshIds`). Removed the divergence comment entirely. The
A→B→A test now expects 3 schemas, which matches Java behavior.
##########
pyiceberg/schema.py:
##########
@@ -1380,6 +1380,68 @@ def primitive(self, primitive: PrimitiveType) ->
PrimitiveType:
return primitive
+class _SetFreshIDsForReplace(_SetFreshIDs):
+ """Assign fresh IDs for a replace operation, reusing IDs from the base
schema by field name.
+
+ For each field in the new schema, if a field with the same full name
exists in the
+ base schema, its ID is reused; otherwise a fresh ID is allocated starting
from
+ last_column_id + 1.
+ """
+
+ def __init__(self, old_id_to_base_id: dict[int, int], starting_id: int) ->
None:
+ self.old_id_to_new_id: dict[int, int] = {}
+ self._old_id_to_base_id = old_id_to_base_id
+ counter = itertools.count(starting_id + 1)
+ self.next_id_func = lambda: next(counter)
+
+ def _get_and_increment(self, current_id: int) -> int:
+ if current_id in self._old_id_to_base_id:
+ new_id = self._old_id_to_base_id[current_id]
+ else:
+ new_id = self.next_id_func()
+ self.old_id_to_new_id[current_id] = new_id
+ return new_id
+
+
+def assign_fresh_schema_ids_for_replace(schema: Schema, all_schemas:
list[Schema], last_column_id: int) -> tuple[Schema, int]:
+ """Assign fresh IDs to a schema for a replace operation, reusing IDs from
existing schemas.
+
+ For each field in the new schema, if a field with the same full path name
exists
+ in any of the existing schemas, its ID is reused. New fields get IDs
starting from
+ last_column_id + 1.
+
+ Args:
+ schema: The new schema to assign IDs to.
+ all_schemas: All schemas from the existing table metadata (IDs are
reused from here by name).
+ last_column_id: The current table's last_column_id (new IDs start
above this).
+
+ Returns:
+ A tuple of (fresh_schema, new_last_column_id).
+ """
+ # N.B. We diverge from the Java implementation by using ALL historical
schemas for
+ # field ID reuse, not just the current schema. Java's
TypeUtil.assignFreshIds only
+ # uses the current schema as the base, so a replace A→B→A where A and B
have
+ # disjoint fields would create a 3rd schema (field IDs from A are lost
when B is
+ # current). By using all schemas, we can recover those IDs and correctly
deduplicate
+ # the schema on the way back. This is safe because Iceberg guarantees field
+ # name-to-ID consistency across schema evolution.
Review Comment:
Agreed — reverted to match Java. We can raise the all-schemas idea with the
community as a follow-up if there's interest.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]