HonahX commented on code in PR #140:
URL: https://github.com/apache/iceberg-python/pull/140#discussion_r1426255789
##########
pyiceberg/catalog/glue.py:
##########
@@ -247,8 +278,50 @@ def _commit_table(self, table_request: CommitTableRequest)
-> CommitTableRespons
Raises:
NoSuchTableError: If a table with the given identifier does not
exist.
+ CommitFailedException: If the commit failed.
"""
- raise NotImplementedError
+ identifier_tuple = self.identifier_to_tuple_without_catalog(
+ tuple(table_request.identifier.namespace.root +
[table_request.identifier.name])
+ )
+ database_name, table_name =
self.identifier_to_database_and_table(identifier_tuple)
+
+ current_glue_table = self._get_glue_table(database_name=database_name,
table_name=table_name)
+ glue_table_version_id = current_glue_table.get("VersionId")
+ if glue_table_version_id is None:
+ raise CommitFailedException(f"Cannot commit
{database_name}.{table_name} because Glue table version id is missing")
+ current_table =
self._convert_glue_to_iceberg(glue_table=current_glue_table)
+ base_metadata = current_table.metadata
+
+ # Validate the update requirements
+ for requirement in table_request.requirements:
+ requirement.validate(base_metadata)
+
+ updated_metadata = update_table_metadata(base_metadata,
table_request.updates)
+ if updated_metadata == base_metadata:
+ # no changes, do nothing
+ return CommitTableResponse(metadata=base_metadata,
metadata_location=current_table.metadata_location)
+
+ # write new metadata
+ new_metadata_version =
self._parse_metadata_version(current_table.metadata_location) + 1
+ new_metadata_location =
self._get_metadata_location(current_table.metadata.location,
new_metadata_version)
+ self._write_metadata(updated_metadata, current_table.io,
new_metadata_location)
+
+ update_table_input = _construct_table_input(
+ table_name=table_name,
+ metadata_location=new_metadata_location,
+ properties=current_table.properties,
+ glue_table=current_glue_table,
+ prev_metadata_location=current_table.metadata_location,
+ )
+
+ self._update_glue_table(
+ database_name=database_name,
+ table_name=table_name,
+ table_input=update_table_input,
+ version_id=glue_table_version_id,
Review Comment:
The `version_id` here aims to instruct the Glue server to reject this update
if the given version id does not match the current version id of this glue
table. For example:
```python
# current table has version id '1'
glue.update_table(database_name, table_input, VersionId='1')
#update success, current table has version id '2'
assert glue.get_table(database_name, table_name)['Table']['VersionId'] == '2'
# Assume a concurrent update has increased table versionId to '3'
glue.update_table(database_name, table_input, VersionId='2')
pyiceberg.exceptions.CommitFailedException: Cannot commit ... because Glue
detected concurrent update
```
We can rely on this feature to achieve optimistic locking for glue catalog.
Details can be found
[here](https://iceberg.apache.org/docs/latest/aws/#optimistic-locking)
I think the description for this argument in [AWS API
Doc](https://docs.aws.amazon.com/glue/latest/webapi/API_UpdateTable.html) does
not fully reflect its purpose. I will add some comment here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]