Fokko commented on code in PR #139:
URL: https://github.com/apache/iceberg-python/pull/139#discussion_r1398847839


##########
pyiceberg/table/__init__.py:
##########
@@ -350,6 +357,241 @@ class RemovePropertiesUpdate(TableUpdate):
     removals: List[str]
 
 
+class TableMetadataUpdateContext:

Review Comment:
   I think we can mark this one private for the module:
   ```suggestion
   class _TableMetadataUpdateContext:
   ```



##########
pyiceberg/table/__init__.py:
##########
@@ -350,6 +357,241 @@ class RemovePropertiesUpdate(TableUpdate):
     removals: List[str]
 
 
+class TableMetadataUpdateContext:
+    updates: List[TableUpdate]

Review Comment:
   I think we should mark this one private for the class:
   ```suggestion
       _updates: List[TableUpdate]
   ```



##########
pyiceberg/table/__init__.py:
##########
@@ -350,6 +357,241 @@ class RemovePropertiesUpdate(TableUpdate):
     removals: List[str]
 
 
+class TableMetadataUpdateContext:
+    updates: List[TableUpdate]
+    last_added_schema_id: Optional[int]
+
+    def __init__(self) -> None:
+        self.updates = []
+        self.last_added_schema_id = None
+
+    def is_added_snapshot(self, snapshot_id: int) -> bool:
+        return any(
+            update.snapshot.snapshot_id == snapshot_id
+            for update in self.updates
+            if update.action == TableUpdateAction.add_snapshot
+        )
+
+    def is_added_schema(self, schema_id: int) -> bool:
+        return any(
+            update.schema_.schema_id == schema_id for update in self.updates 
if update.action == TableUpdateAction.add_schema
+        )
+
+
+@singledispatch

Review Comment:
   Love the `@singledispatch` here!



##########
pyiceberg/table/__init__.py:
##########
@@ -350,6 +357,241 @@ class RemovePropertiesUpdate(TableUpdate):
     removals: List[str]
 
 
+class TableMetadataUpdateContext:
+    updates: List[TableUpdate]
+    last_added_schema_id: Optional[int]

Review Comment:
   It looks like we don't use this one?
   ```suggestion
   ```



##########
pyiceberg/table/__init__.py:
##########
@@ -350,6 +357,241 @@ class RemovePropertiesUpdate(TableUpdate):
     removals: List[str]
 
 
+class TableMetadataUpdateContext:
+    updates: List[TableUpdate]
+    last_added_schema_id: Optional[int]
+
+    def __init__(self) -> None:
+        self.updates = []
+        self.last_added_schema_id = None
+
+    def is_added_snapshot(self, snapshot_id: int) -> bool:
+        return any(
+            update.snapshot.snapshot_id == snapshot_id
+            for update in self.updates
+            if update.action == TableUpdateAction.add_snapshot
+        )
+
+    def is_added_schema(self, schema_id: int) -> bool:
+        return any(
+            update.schema_.schema_id == schema_id for update in self.updates 
if update.action == TableUpdateAction.add_schema
+        )
+
+
+@singledispatch
+def apply_table_update(update: TableUpdate, base_metadata: TableMetadata, 
context: TableMetadataUpdateContext) -> TableMetadata:
+    """Apply a table update to the table metadata.
+
+    Args:
+        update: The update to be applied.
+        base_metadata: The base metadata to be updated.
+        context: Contains previous updates, last_added_snapshot_id and other 
change tracking information in the current transaction.
+
+    Returns:
+        The updated metadata.
+
+    """
+    raise NotImplementedError(f"Unsupported table update: {update}")
+
+
+@apply_table_update.register(UpgradeFormatVersionUpdate)
+def _(update: UpgradeFormatVersionUpdate, base_metadata: TableMetadata, 
context: TableMetadataUpdateContext) -> TableMetadata:
+    if update.format_version > SUPPORTED_TABLE_FORMAT_VERSION:
+        raise ValueError(f"Unsupported table format version: 
{update.format_version}")
+
+    if update.format_version < base_metadata.format_version:
+        raise ValueError(f"Cannot downgrade v{base_metadata.format_version} 
table to v{update.format_version}")
+
+    if update.format_version == base_metadata.format_version:
+        return base_metadata

Review Comment:
   Nit: since they are exclusive, I'd prefer to use `elif` and `else`
   ```suggestion
       if update.format_version > SUPPORTED_TABLE_FORMAT_VERSION:
           raise ValueError(f"Unsupported table format version: 
{update.format_version}")
       elif update.format_version < base_metadata.format_version:
           raise ValueError(f"Cannot downgrade v{base_metadata.format_version} 
table to v{update.format_version}")
       elif update.format_version == base_metadata.format_version:
           return base_metadata
   ```



##########
pyiceberg/table/__init__.py:
##########
@@ -350,6 +357,241 @@ class RemovePropertiesUpdate(TableUpdate):
     removals: List[str]
 
 
+class TableMetadataUpdateContext:
+    updates: List[TableUpdate]
+    last_added_schema_id: Optional[int]
+
+    def __init__(self) -> None:
+        self.updates = []
+        self.last_added_schema_id = None
+
+    def is_added_snapshot(self, snapshot_id: int) -> bool:
+        return any(
+            update.snapshot.snapshot_id == snapshot_id
+            for update in self.updates
+            if update.action == TableUpdateAction.add_snapshot
+        )
+
+    def is_added_schema(self, schema_id: int) -> bool:
+        return any(
+            update.schema_.schema_id == schema_id for update in self.updates 
if update.action == TableUpdateAction.add_schema
+        )
+
+
+@singledispatch
+def apply_table_update(update: TableUpdate, base_metadata: TableMetadata, 
context: TableMetadataUpdateContext) -> TableMetadata:
+    """Apply a table update to the table metadata.
+
+    Args:
+        update: The update to be applied.
+        base_metadata: The base metadata to be updated.
+        context: Contains previous updates, last_added_snapshot_id and other 
change tracking information in the current transaction.
+
+    Returns:
+        The updated metadata.
+
+    """
+    raise NotImplementedError(f"Unsupported table update: {update}")
+
+
+@apply_table_update.register(UpgradeFormatVersionUpdate)
+def _(update: UpgradeFormatVersionUpdate, base_metadata: TableMetadata, 
context: TableMetadataUpdateContext) -> TableMetadata:
+    if update.format_version > SUPPORTED_TABLE_FORMAT_VERSION:
+        raise ValueError(f"Unsupported table format version: 
{update.format_version}")
+
+    if update.format_version < base_metadata.format_version:
+        raise ValueError(f"Cannot downgrade v{base_metadata.format_version} 
table to v{update.format_version}")
+
+    if update.format_version == base_metadata.format_version:
+        return base_metadata
+
+    updated_metadata_data = copy(base_metadata.model_dump())
+    updated_metadata_data["format-version"] = update.format_version

Review Comment:
   While this is a very safe way of doing the copy, it is also rather expensive 
since we convert everything to a Python dict, and then create a new object 
again. Pydantic has the `model_copy` argument that seems to do what we're 
looking for:
   ```suggestion
       updated_metadata_data = base_metadata.model_copy(**{"format-version": 
update.format_version})
   ```
   
   This will make a shallow copy by default (which I think is okay, since the 
model is immutable).



##########
pyiceberg/table/__init__.py:
##########
@@ -350,6 +357,241 @@ class RemovePropertiesUpdate(TableUpdate):
     removals: List[str]
 
 
+class TableMetadataUpdateContext:
+    updates: List[TableUpdate]
+    last_added_schema_id: Optional[int]
+
+    def __init__(self) -> None:
+        self.updates = []
+        self.last_added_schema_id = None
+
+    def is_added_snapshot(self, snapshot_id: int) -> bool:
+        return any(
+            update.snapshot.snapshot_id == snapshot_id
+            for update in self.updates
+            if update.action == TableUpdateAction.add_snapshot
+        )
+
+    def is_added_schema(self, schema_id: int) -> bool:
+        return any(
+            update.schema_.schema_id == schema_id for update in self.updates 
if update.action == TableUpdateAction.add_schema
+        )
+
+
+@singledispatch
+def apply_table_update(update: TableUpdate, base_metadata: TableMetadata, 
context: TableMetadataUpdateContext) -> TableMetadata:
+    """Apply a table update to the table metadata.
+
+    Args:
+        update: The update to be applied.
+        base_metadata: The base metadata to be updated.
+        context: Contains previous updates, last_added_snapshot_id and other 
change tracking information in the current transaction.
+
+    Returns:
+        The updated metadata.
+
+    """
+    raise NotImplementedError(f"Unsupported table update: {update}")
+
+
+@apply_table_update.register(UpgradeFormatVersionUpdate)
+def _(update: UpgradeFormatVersionUpdate, base_metadata: TableMetadata, 
context: TableMetadataUpdateContext) -> TableMetadata:
+    if update.format_version > SUPPORTED_TABLE_FORMAT_VERSION:
+        raise ValueError(f"Unsupported table format version: 
{update.format_version}")
+
+    if update.format_version < base_metadata.format_version:
+        raise ValueError(f"Cannot downgrade v{base_metadata.format_version} 
table to v{update.format_version}")
+
+    if update.format_version == base_metadata.format_version:
+        return base_metadata
+
+    updated_metadata_data = copy(base_metadata.model_dump())
+    updated_metadata_data["format-version"] = update.format_version
+
+    context.updates.append(update)
+    return TableMetadataUtil.parse_obj(updated_metadata_data)
+
+
+@apply_table_update.register(AddSchemaUpdate)
+def _(update: AddSchemaUpdate, base_metadata: TableMetadata, context: 
TableMetadataUpdateContext) -> TableMetadata:
+    def reuse_or_create_new_schema_id(new_schema: Schema) -> Tuple[int, bool]:
+        """Reuse schema id if schema already exists, otherwise create a new 
one.
+
+        Args:
+            new_schema: The new schema to be added.
+
+        Returns:
+            The new schema id and whether the schema already exists.
+        """
+        result_schema_id = base_metadata.current_schema_id
+        for schema in base_metadata.schemas:
+            if schema == new_schema:
+                return schema.schema_id, True
+            elif schema.schema_id >= result_schema_id:
+                result_schema_id = schema.schema_id + 1
+        return result_schema_id, False
+
+    if update.last_column_id < base_metadata.last_column_id:
+        raise ValueError(f"Invalid last column id {update.last_column_id}, 
must be >= {base_metadata.last_column_id}")
+
+    new_schema_id, schema_found = reuse_or_create_new_schema_id(update.schema_)
+    if schema_found and update.last_column_id == base_metadata.last_column_id:
+        if context.last_added_schema_id is not None and 
context.is_added_schema(new_schema_id):
+            context.last_added_schema_id = new_schema_id
+        return base_metadata
+
+    updated_metadata_data = copy(base_metadata.model_dump())
+    updated_metadata_data["last-column-id"] = update.last_column_id
+
+    new_schema = (
+        update.schema_
+        if new_schema_id == update.schema_.schema_id
+        else Schema(*update.schema_.fields, schema_id=new_schema_id, 
identifier_field_ids=update.schema_.identifier_field_ids)
+    )
+
+    if not schema_found:
+        updated_metadata_data["schemas"].append(new_schema.model_dump())
+
+    context.updates.append(update)
+    context.last_added_schema_id = new_schema_id
+    return TableMetadataUtil.parse_obj(updated_metadata_data)
+
+
+@apply_table_update.register(SetCurrentSchemaUpdate)
+def _(update: SetCurrentSchemaUpdate, base_metadata: TableMetadata, context: 
TableMetadataUpdateContext) -> TableMetadata:
+    if update.schema_id == -1:
+        if context.last_added_schema_id is None:
+            raise ValueError("Cannot set current schema to last added schema 
when no schema has been added")
+        return 
apply_table_update(SetCurrentSchemaUpdate(schema_id=context.last_added_schema_id),
 base_metadata, context)
+
+    if update.schema_id == base_metadata.current_schema_id:
+        return base_metadata
+
+    schema = base_metadata.schemas_by_id.get(update.schema_id)
+    if schema is None:
+        raise ValueError(f"Schema with id {update.schema_id} does not exist")
+
+    updated_metadata_data = copy(base_metadata.model_dump())
+    updated_metadata_data["current-schema-id"] = update.schema_id
+
+    if context.last_added_schema_id is not None and 
context.last_added_schema_id == update.schema_id:
+        context.updates.append(SetCurrentSchemaUpdate(schema_id=-1))
+    else:
+        context.updates.append(update)
+
+    return TableMetadataUtil.parse_obj(updated_metadata_data)
+
+
+@apply_table_update.register(AddSnapshotUpdate)
+def _(update: AddSnapshotUpdate, base_metadata: TableMetadata, context: 
TableMetadataUpdateContext) -> TableMetadata:
+    if len(base_metadata.schemas) == 0:
+        raise ValueError("Attempting to add a snapshot before a schema is 
added")
+
+    if len(base_metadata.partition_specs) == 0:
+        raise ValueError("Attempting to add a snapshot before a partition spec 
is added")
+
+    if len(base_metadata.sort_orders) == 0:
+        raise ValueError("Attempting to add a snapshot before a sort order is 
added")
+
+    if base_metadata.snapshots_by_id.get(update.snapshot.snapshot_id) is not 
None:
+        raise ValueError(f"Snapshot with id {update.snapshot.snapshot_id} 
already exists")

Review Comment:
   ```suggestion
       if len(base_metadata.schemas) == 0:
           raise ValueError("Attempting to add a snapshot before a schema is 
added")
       elif len(base_metadata.partition_specs) == 0:
           raise ValueError("Attempting to add a snapshot before a partition 
spec is added")
       elif len(base_metadata.sort_orders) == 0:
           raise ValueError("Attempting to add a snapshot before a sort order 
is added")
       elif base_metadata.snapshots_by_id.get(update.snapshot.snapshot_id) is 
not None:
           raise ValueError(f"Snapshot with id {update.snapshot.snapshot_id} 
already exists")
   ```



##########
pyiceberg/table/__init__.py:
##########
@@ -350,6 +357,241 @@ class RemovePropertiesUpdate(TableUpdate):
     removals: List[str]
 
 
+class TableMetadataUpdateContext:
+    updates: List[TableUpdate]
+    last_added_schema_id: Optional[int]
+
+    def __init__(self) -> None:
+        self.updates = []
+        self.last_added_schema_id = None
+
+    def is_added_snapshot(self, snapshot_id: int) -> bool:
+        return any(
+            update.snapshot.snapshot_id == snapshot_id
+            for update in self.updates
+            if update.action == TableUpdateAction.add_snapshot
+        )
+
+    def is_added_schema(self, schema_id: int) -> bool:
+        return any(
+            update.schema_.schema_id == schema_id for update in self.updates 
if update.action == TableUpdateAction.add_schema
+        )
+
+
+@singledispatch
+def apply_table_update(update: TableUpdate, base_metadata: TableMetadata, 
context: TableMetadataUpdateContext) -> TableMetadata:
+    """Apply a table update to the table metadata.
+
+    Args:
+        update: The update to be applied.
+        base_metadata: The base metadata to be updated.
+        context: Contains previous updates, last_added_snapshot_id and other 
change tracking information in the current transaction.
+
+    Returns:
+        The updated metadata.
+
+    """
+    raise NotImplementedError(f"Unsupported table update: {update}")
+
+
+@apply_table_update.register(UpgradeFormatVersionUpdate)
+def _(update: UpgradeFormatVersionUpdate, base_metadata: TableMetadata, 
context: TableMetadataUpdateContext) -> TableMetadata:
+    if update.format_version > SUPPORTED_TABLE_FORMAT_VERSION:
+        raise ValueError(f"Unsupported table format version: 
{update.format_version}")
+
+    if update.format_version < base_metadata.format_version:
+        raise ValueError(f"Cannot downgrade v{base_metadata.format_version} 
table to v{update.format_version}")
+
+    if update.format_version == base_metadata.format_version:
+        return base_metadata
+
+    updated_metadata_data = copy(base_metadata.model_dump())
+    updated_metadata_data["format-version"] = update.format_version
+
+    context.updates.append(update)
+    return TableMetadataUtil.parse_obj(updated_metadata_data)
+
+
+@apply_table_update.register(AddSchemaUpdate)
+def _(update: AddSchemaUpdate, base_metadata: TableMetadata, context: 
TableMetadataUpdateContext) -> TableMetadata:
+    def reuse_or_create_new_schema_id(new_schema: Schema) -> Tuple[int, bool]:
+        """Reuse schema id if schema already exists, otherwise create a new 
one.
+
+        Args:
+            new_schema: The new schema to be added.
+
+        Returns:
+            The new schema id and whether the schema already exists.
+        """
+        result_schema_id = base_metadata.current_schema_id
+        for schema in base_metadata.schemas:
+            if schema == new_schema:
+                return schema.schema_id, True
+            elif schema.schema_id >= result_schema_id:
+                result_schema_id = schema.schema_id + 1
+        return result_schema_id, False
+
+    if update.last_column_id < base_metadata.last_column_id:
+        raise ValueError(f"Invalid last column id {update.last_column_id}, 
must be >= {base_metadata.last_column_id}")
+
+    new_schema_id, schema_found = reuse_or_create_new_schema_id(update.schema_)
+    if schema_found and update.last_column_id == base_metadata.last_column_id:
+        if context.last_added_schema_id is not None and 
context.is_added_schema(new_schema_id):
+            context.last_added_schema_id = new_schema_id
+        return base_metadata
+
+    updated_metadata_data = copy(base_metadata.model_dump())
+    updated_metadata_data["last-column-id"] = update.last_column_id
+
+    new_schema = (
+        update.schema_
+        if new_schema_id == update.schema_.schema_id
+        else Schema(*update.schema_.fields, schema_id=new_schema_id, 
identifier_field_ids=update.schema_.identifier_field_ids)
+    )
+
+    if not schema_found:
+        updated_metadata_data["schemas"].append(new_schema.model_dump())
+
+    context.updates.append(update)
+    context.last_added_schema_id = new_schema_id
+    return TableMetadataUtil.parse_obj(updated_metadata_data)
+
+
+@apply_table_update.register(SetCurrentSchemaUpdate)
+def _(update: SetCurrentSchemaUpdate, base_metadata: TableMetadata, context: 
TableMetadataUpdateContext) -> TableMetadata:
+    if update.schema_id == -1:
+        if context.last_added_schema_id is None:
+            raise ValueError("Cannot set current schema to last added schema 
when no schema has been added")
+        return 
apply_table_update(SetCurrentSchemaUpdate(schema_id=context.last_added_schema_id),
 base_metadata, context)
+
+    if update.schema_id == base_metadata.current_schema_id:
+        return base_metadata
+
+    schema = base_metadata.schemas_by_id.get(update.schema_id)
+    if schema is None:
+        raise ValueError(f"Schema with id {update.schema_id} does not exist")
+
+    updated_metadata_data = copy(base_metadata.model_dump())
+    updated_metadata_data["current-schema-id"] = update.schema_id
+
+    if context.last_added_schema_id is not None and 
context.last_added_schema_id == update.schema_id:
+        context.updates.append(SetCurrentSchemaUpdate(schema_id=-1))
+    else:
+        context.updates.append(update)
+
+    return TableMetadataUtil.parse_obj(updated_metadata_data)
+
+
+@apply_table_update.register(AddSnapshotUpdate)
+def _(update: AddSnapshotUpdate, base_metadata: TableMetadata, context: 
TableMetadataUpdateContext) -> TableMetadata:
+    if len(base_metadata.schemas) == 0:
+        raise ValueError("Attempting to add a snapshot before a schema is 
added")
+
+    if len(base_metadata.partition_specs) == 0:
+        raise ValueError("Attempting to add a snapshot before a partition spec 
is added")
+
+    if len(base_metadata.sort_orders) == 0:
+        raise ValueError("Attempting to add a snapshot before a sort order is 
added")
+
+    if base_metadata.snapshots_by_id.get(update.snapshot.snapshot_id) is not 
None:
+        raise ValueError(f"Snapshot with id {update.snapshot.snapshot_id} 
already exists")
+
+    if (
+        base_metadata.format_version == 2
+        and update.snapshot.sequence_number is not None
+        and update.snapshot.sequence_number <= 
base_metadata.last_sequence_number
+        and update.snapshot.parent_snapshot_id is not None
+    ):
+        raise ValueError(
+            f"Cannot add snapshot with sequence number 
{update.snapshot.sequence_number} "
+            f"older than last sequence number 
{base_metadata.last_sequence_number}"
+        )
+
+    updated_metadata_data = copy(base_metadata.model_dump())
+    updated_metadata_data["last-updated-ms"] = update.snapshot.timestamp_ms
+    updated_metadata_data["last-sequence-number"] = 
update.snapshot.sequence_number
+    updated_metadata_data["snapshots"].append(update.snapshot.model_dump())
+    context.updates.append(update)
+    return TableMetadataUtil.parse_obj(updated_metadata_data)
+
+
+@apply_table_update.register(SetSnapshotRefUpdate)
+def _(update: SetSnapshotRefUpdate, base_metadata: TableMetadata, context: 
TableMetadataUpdateContext) -> TableMetadata:
+    if update.type is None:
+        raise ValueError("Snapshot ref type must be set")
+
+    if update.min_snapshots_to_keep is not None and update.type == 
SnapshotRefType.TAG:
+        raise ValueError("Cannot set min snapshots to keep for branch refs")
+
+    if update.min_snapshots_to_keep is not None and 
update.min_snapshots_to_keep <= 0:
+        raise ValueError("Minimum snapshots to keep must be >= 0")
+
+    if update.max_snapshot_age_ms is not None and update.type == 
SnapshotRefType.TAG:
+        raise ValueError("Tags do not support setting maxSnapshotAgeMs")
+
+    if update.max_snapshot_age_ms is not None and update.max_snapshot_age_ms 
<= 0:
+        raise ValueError("Max snapshot age must be > 0 ms")
+
+    if update.max_ref_age_ms is not None and update.max_ref_age_ms <= 0:
+        raise ValueError("Max ref age must be > 0 ms")

Review Comment:
   ```suggestion
       if update.type is None:
           raise ValueError("Snapshot ref type must be set")
       elif update.min_snapshots_to_keep is not None and update.type == 
SnapshotRefType.TAG:
           raise ValueError("Cannot set min snapshots to keep for branch refs")
       elif update.min_snapshots_to_keep is not None and 
update.min_snapshots_to_keep <= 0:
           raise ValueError("Minimum snapshots to keep must be >= 0")
       elif update.max_snapshot_age_ms is not None and update.type == 
SnapshotRefType.TAG:
           raise ValueError("Tags do not support setting maxSnapshotAgeMs")
       elif update.max_snapshot_age_ms is not None and 
update.max_snapshot_age_ms <= 0:
           raise ValueError("Max snapshot age must be > 0 ms")
       elif update.max_ref_age_ms is not None and update.max_ref_age_ms <= 0:
           raise ValueError("Max ref age must be > 0 ms")
   ```
   
   Some more observations:
   
   - The first check `update.type is None` should never pass because `type` is 
a required field.
   - We could also enforce the `> 0` constraints using Pydantic: 
https://docs.pydantic.dev/latest/concepts/types/#composing-types-via-annotated



##########
pyiceberg/table/__init__.py:
##########
@@ -350,6 +357,241 @@ class RemovePropertiesUpdate(TableUpdate):
     removals: List[str]
 
 
+class TableMetadataUpdateContext:
+    updates: List[TableUpdate]
+    last_added_schema_id: Optional[int]
+
+    def __init__(self) -> None:
+        self.updates = []
+        self.last_added_schema_id = None
+
+    def is_added_snapshot(self, snapshot_id: int) -> bool:
+        return any(
+            update.snapshot.snapshot_id == snapshot_id
+            for update in self.updates
+            if update.action == TableUpdateAction.add_snapshot
+        )
+
+    def is_added_schema(self, schema_id: int) -> bool:
+        return any(
+            update.schema_.schema_id == schema_id for update in self.updates 
if update.action == TableUpdateAction.add_schema
+        )
+
+
+@singledispatch
+def apply_table_update(update: TableUpdate, base_metadata: TableMetadata, 
context: TableMetadataUpdateContext) -> TableMetadata:
+    """Apply a table update to the table metadata.
+
+    Args:
+        update: The update to be applied.
+        base_metadata: The base metadata to be updated.
+        context: Contains previous updates, last_added_snapshot_id and other 
change tracking information in the current transaction.
+
+    Returns:
+        The updated metadata.
+
+    """
+    raise NotImplementedError(f"Unsupported table update: {update}")
+
+
+@apply_table_update.register(UpgradeFormatVersionUpdate)
+def _(update: UpgradeFormatVersionUpdate, base_metadata: TableMetadata, 
context: TableMetadataUpdateContext) -> TableMetadata:
+    if update.format_version > SUPPORTED_TABLE_FORMAT_VERSION:
+        raise ValueError(f"Unsupported table format version: 
{update.format_version}")
+
+    if update.format_version < base_metadata.format_version:
+        raise ValueError(f"Cannot downgrade v{base_metadata.format_version} 
table to v{update.format_version}")
+
+    if update.format_version == base_metadata.format_version:
+        return base_metadata
+
+    updated_metadata_data = copy(base_metadata.model_dump())
+    updated_metadata_data["format-version"] = update.format_version
+
+    context.updates.append(update)
+    return TableMetadataUtil.parse_obj(updated_metadata_data)
+
+
+@apply_table_update.register(AddSchemaUpdate)
+def _(update: AddSchemaUpdate, base_metadata: TableMetadata, context: 
TableMetadataUpdateContext) -> TableMetadata:
+    def reuse_or_create_new_schema_id(new_schema: Schema) -> Tuple[int, bool]:
+        """Reuse schema id if schema already exists, otherwise create a new 
one.
+
+        Args:
+            new_schema: The new schema to be added.
+
+        Returns:
+            The new schema id and whether the schema already exists.
+        """
+        result_schema_id = base_metadata.current_schema_id
+        for schema in base_metadata.schemas:
+            if schema == new_schema:
+                return schema.schema_id, True
+            elif schema.schema_id >= result_schema_id:
+                result_schema_id = schema.schema_id + 1
+        return result_schema_id, False
+
+    if update.last_column_id < base_metadata.last_column_id:
+        raise ValueError(f"Invalid last column id {update.last_column_id}, 
must be >= {base_metadata.last_column_id}")
+
+    new_schema_id, schema_found = reuse_or_create_new_schema_id(update.schema_)
+    if schema_found and update.last_column_id == base_metadata.last_column_id:
+        if context.last_added_schema_id is not None and 
context.is_added_schema(new_schema_id):
+            context.last_added_schema_id = new_schema_id
+        return base_metadata
+
+    updated_metadata_data = copy(base_metadata.model_dump())
+    updated_metadata_data["last-column-id"] = update.last_column_id
+
+    new_schema = (
+        update.schema_
+        if new_schema_id == update.schema_.schema_id
+        else Schema(*update.schema_.fields, schema_id=new_schema_id, 
identifier_field_ids=update.schema_.identifier_field_ids)
+    )

Review Comment:
   Do we need to construct a new schema here? The `new_schema_id` should have 
been set correctly by the schema builder.



##########
pyiceberg/table/__init__.py:
##########
@@ -350,6 +357,241 @@ class RemovePropertiesUpdate(TableUpdate):
     removals: List[str]
 
 
+class TableMetadataUpdateContext:
+    updates: List[TableUpdate]
+    last_added_schema_id: Optional[int]
+
+    def __init__(self) -> None:
+        self.updates = []
+        self.last_added_schema_id = None
+
+    def is_added_snapshot(self, snapshot_id: int) -> bool:
+        return any(
+            update.snapshot.snapshot_id == snapshot_id
+            for update in self.updates
+            if update.action == TableUpdateAction.add_snapshot
+        )
+
+    def is_added_schema(self, schema_id: int) -> bool:
+        return any(
+            update.schema_.schema_id == schema_id for update in self.updates 
if update.action == TableUpdateAction.add_schema
+        )
+
+
+@singledispatch
+def apply_table_update(update: TableUpdate, base_metadata: TableMetadata, 
context: TableMetadataUpdateContext) -> TableMetadata:
+    """Apply a table update to the table metadata.
+
+    Args:
+        update: The update to be applied.
+        base_metadata: The base metadata to be updated.
+        context: Contains previous updates, last_added_snapshot_id and other 
change tracking information in the current transaction.
+
+    Returns:
+        The updated metadata.
+
+    """
+    raise NotImplementedError(f"Unsupported table update: {update}")
+
+
+@apply_table_update.register(UpgradeFormatVersionUpdate)
+def _(update: UpgradeFormatVersionUpdate, base_metadata: TableMetadata, 
context: TableMetadataUpdateContext) -> TableMetadata:
+    if update.format_version > SUPPORTED_TABLE_FORMAT_VERSION:
+        raise ValueError(f"Unsupported table format version: 
{update.format_version}")
+
+    if update.format_version < base_metadata.format_version:
+        raise ValueError(f"Cannot downgrade v{base_metadata.format_version} 
table to v{update.format_version}")
+
+    if update.format_version == base_metadata.format_version:
+        return base_metadata
+
+    updated_metadata_data = copy(base_metadata.model_dump())
+    updated_metadata_data["format-version"] = update.format_version
+
+    context.updates.append(update)
+    return TableMetadataUtil.parse_obj(updated_metadata_data)
+
+
+@apply_table_update.register(AddSchemaUpdate)
+def _(update: AddSchemaUpdate, base_metadata: TableMetadata, context: 
TableMetadataUpdateContext) -> TableMetadata:
+    def reuse_or_create_new_schema_id(new_schema: Schema) -> Tuple[int, bool]:
+        """Reuse schema id if schema already exists, otherwise create a new 
one.
+
+        Args:
+            new_schema: The new schema to be added.
+
+        Returns:
+            The new schema id and whether the schema already exists.
+        """
+        result_schema_id = base_metadata.current_schema_id
+        for schema in base_metadata.schemas:
+            if schema == new_schema:
+                return schema.schema_id, True
+            elif schema.schema_id >= result_schema_id:
+                result_schema_id = schema.schema_id + 1
+        return result_schema_id, False
+
+    if update.last_column_id < base_metadata.last_column_id:
+        raise ValueError(f"Invalid last column id {update.last_column_id}, 
must be >= {base_metadata.last_column_id}")
+
+    new_schema_id, schema_found = reuse_or_create_new_schema_id(update.schema_)
+    if schema_found and update.last_column_id == base_metadata.last_column_id:
+        if context.last_added_schema_id is not None and 
context.is_added_schema(new_schema_id):
+            context.last_added_schema_id = new_schema_id
+        return base_metadata
+
+    updated_metadata_data = copy(base_metadata.model_dump())
+    updated_metadata_data["last-column-id"] = update.last_column_id
+
+    new_schema = (
+        update.schema_
+        if new_schema_id == update.schema_.schema_id
+        else Schema(*update.schema_.fields, schema_id=new_schema_id, 
identifier_field_ids=update.schema_.identifier_field_ids)
+    )
+
+    if not schema_found:
+        updated_metadata_data["schemas"].append(new_schema.model_dump())
+
+    context.updates.append(update)
+    context.last_added_schema_id = new_schema_id
+    return TableMetadataUtil.parse_obj(updated_metadata_data)
+
+
+@apply_table_update.register(SetCurrentSchemaUpdate)
+def _(update: SetCurrentSchemaUpdate, base_metadata: TableMetadata, context: 
TableMetadataUpdateContext) -> TableMetadata:
+    if update.schema_id == -1:
+        if context.last_added_schema_id is None:
+            raise ValueError("Cannot set current schema to last added schema 
when no schema has been added")
+        return 
apply_table_update(SetCurrentSchemaUpdate(schema_id=context.last_added_schema_id),
 base_metadata, context)
+
+    if update.schema_id == base_metadata.current_schema_id:
+        return base_metadata
+
+    schema = base_metadata.schemas_by_id.get(update.schema_id)
+    if schema is None:
+        raise ValueError(f"Schema with id {update.schema_id} does not exist")
+
+    updated_metadata_data = copy(base_metadata.model_dump())
+    updated_metadata_data["current-schema-id"] = update.schema_id
+
+    if context.last_added_schema_id is not None and 
context.last_added_schema_id == update.schema_id:
+        context.updates.append(SetCurrentSchemaUpdate(schema_id=-1))
+    else:
+        context.updates.append(update)
+
+    return TableMetadataUtil.parse_obj(updated_metadata_data)
+
+
+@apply_table_update.register(AddSnapshotUpdate)
+def _(update: AddSnapshotUpdate, base_metadata: TableMetadata, context: 
TableMetadataUpdateContext) -> TableMetadata:
+    if len(base_metadata.schemas) == 0:
+        raise ValueError("Attempting to add a snapshot before a schema is 
added")
+
+    if len(base_metadata.partition_specs) == 0:
+        raise ValueError("Attempting to add a snapshot before a partition spec 
is added")
+
+    if len(base_metadata.sort_orders) == 0:
+        raise ValueError("Attempting to add a snapshot before a sort order is 
added")
+
+    if base_metadata.snapshots_by_id.get(update.snapshot.snapshot_id) is not 
None:
+        raise ValueError(f"Snapshot with id {update.snapshot.snapshot_id} 
already exists")
+
+    if (
+        base_metadata.format_version == 2
+        and update.snapshot.sequence_number is not None
+        and update.snapshot.sequence_number <= 
base_metadata.last_sequence_number
+        and update.snapshot.parent_snapshot_id is not None
+    ):
+        raise ValueError(
+            f"Cannot add snapshot with sequence number 
{update.snapshot.sequence_number} "
+            f"older than last sequence number 
{base_metadata.last_sequence_number}"
+        )
+
+    updated_metadata_data = copy(base_metadata.model_dump())
+    updated_metadata_data["last-updated-ms"] = update.snapshot.timestamp_ms
+    updated_metadata_data["last-sequence-number"] = 
update.snapshot.sequence_number
+    updated_metadata_data["snapshots"].append(update.snapshot.model_dump())
+    context.updates.append(update)
+    return TableMetadataUtil.parse_obj(updated_metadata_data)
+
+
+@apply_table_update.register(SetSnapshotRefUpdate)
+def _(update: SetSnapshotRefUpdate, base_metadata: TableMetadata, context: 
TableMetadataUpdateContext) -> TableMetadata:
+    if update.type is None:
+        raise ValueError("Snapshot ref type must be set")
+
+    if update.min_snapshots_to_keep is not None and update.type == 
SnapshotRefType.TAG:
+        raise ValueError("Cannot set min snapshots to keep for branch refs")
+
+    if update.min_snapshots_to_keep is not None and 
update.min_snapshots_to_keep <= 0:
+        raise ValueError("Minimum snapshots to keep must be >= 0")
+
+    if update.max_snapshot_age_ms is not None and update.type == 
SnapshotRefType.TAG:
+        raise ValueError("Tags do not support setting maxSnapshotAgeMs")
+
+    if update.max_snapshot_age_ms is not None and update.max_snapshot_age_ms 
<= 0:
+        raise ValueError("Max snapshot age must be > 0 ms")
+
+    if update.max_ref_age_ms is not None and update.max_ref_age_ms <= 0:
+        raise ValueError("Max ref age must be > 0 ms")
+
+    snapshot_ref = SnapshotRef(
+        snapshot_id=update.snapshot_id,
+        snapshot_ref_type=update.type,
+        min_snapshots_to_keep=update.min_snapshots_to_keep,
+        max_snapshot_age_ms=update.max_snapshot_age_ms,
+        max_ref_age_ms=update.max_ref_age_ms,
+    )
+
+    existing_ref = base_metadata.refs.get(update.ref_name)
+    if existing_ref is not None and existing_ref == snapshot_ref:
+        return base_metadata
+
+    snapshot = base_metadata.snapshots_by_id.get(snapshot_ref.snapshot_id)
+    if snapshot is None:
+        raise ValueError(f"Cannot set {snapshot_ref.ref_name} to unknown 
snapshot {snapshot_ref.snapshot_id}")
+
+    update_metadata_data = copy(base_metadata.model_dump())
+    update_last_updated_ms = True
+    if context.is_added_snapshot(snapshot_ref.snapshot_id):
+        update_metadata_data["last-updated-ms"] = snapshot.timestamp_ms
+        update_last_updated_ms = False
+
+    if update.ref_name == MAIN_BRANCH:
+        update_metadata_data["current-snapshot-id"] = snapshot_ref.snapshot_id
+        if update_last_updated_ms:
+            update_metadata_data["last-updated-ms"] = 
datetime_to_millis(datetime.datetime.now().astimezone())
+        update_metadata_data["snapshot-log"].append(
+            SnapshotLogEntry(
+                snapshot_id=snapshot_ref.snapshot_id,
+                timestamp_ms=update_metadata_data["last-updated-ms"],
+            ).model_dump()
+        )
+
+    update_metadata_data["refs"][update.ref_name] = snapshot_ref.model_dump()
+    context.updates.append(update)
+    return TableMetadataUtil.parse_obj(update_metadata_data)
+
+
+def update_table_metadata(base_metadata: TableMetadata, updates: 
Tuple[TableUpdate, ...]) -> TableMetadata:
+    """Update the table metadata with the given updates in one transaction.
+
+    Args:
+        base_metadata: The base metadata to be updated.
+        updates: The updates in one transaction.
+
+    Returns:
+        The updated metadata.

Review Comment:
   ```suggestion
           The metadata with the updates applied.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org


Reply via email to