HonahX commented on code in PR #140: URL: https://github.com/apache/iceberg-python/pull/140#discussion_r1426255789
########## pyiceberg/catalog/glue.py: ########## @@ -247,8 +278,50 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons Raises: NoSuchTableError: If a table with the given identifier does not exist. + CommitFailedException: If the commit failed. """ - raise NotImplementedError + identifier_tuple = self.identifier_to_tuple_without_catalog( + tuple(table_request.identifier.namespace.root + [table_request.identifier.name]) + ) + database_name, table_name = self.identifier_to_database_and_table(identifier_tuple) + + current_glue_table = self._get_glue_table(database_name=database_name, table_name=table_name) + glue_table_version_id = current_glue_table.get("VersionId") + if glue_table_version_id is None: + raise CommitFailedException(f"Cannot commit {database_name}.{table_name} because Glue table version id is missing") + current_table = self._convert_glue_to_iceberg(glue_table=current_glue_table) + base_metadata = current_table.metadata + + # Validate the update requirements + for requirement in table_request.requirements: + requirement.validate(base_metadata) + + updated_metadata = update_table_metadata(base_metadata, table_request.updates) + if updated_metadata == base_metadata: + # no changes, do nothing + return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location) + + # write new metadata + new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 + new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version) + self._write_metadata(updated_metadata, current_table.io, new_metadata_location) + + update_table_input = _construct_table_input( + table_name=table_name, + metadata_location=new_metadata_location, + properties=current_table.properties, + glue_table=current_glue_table, + prev_metadata_location=current_table.metadata_location, + ) + + self._update_glue_table( + database_name=database_name, + table_name=table_name, + table_input=update_table_input, + version_id=glue_table_version_id, Review Comment: The `version_id` here aims to instruct the Glue server to reject this update if the given version id does not match the current version id of this glue table. For example: ```python # current table has version id '1' glue.update_table(database_name, table_input, VersionId='1') # Update success, current table has version id '2' assert glue.get_table(database_name, table_name)['Table']['VersionId'] == '2' # Assume a concurrent update has increased table versionId to '3' glue.update_table(database_name, table_input, VersionId='2') glue.exceptions.ConcurrentModificationException: Cannot commit ... because Glue detected concurrent update ``` We can rely on this feature to achieve optimistic locking for glue catalog. Details can be found [here](https://iceberg.apache.org/docs/latest/aws/#optimistic-locking) I think the description for this argument in [AWS API Doc](https://docs.aws.amazon.com/glue/latest/webapi/API_UpdateTable.html) does not fully reflect its purpose. I will add some comment here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org