This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new f72e363b Refactor GlueCatalog `_commit_table` (#653)
f72e363b is described below

commit f72e363b18baa181c998bbdef657982159a22d48
Author: Honah J <[email protected]>
AuthorDate: Thu Apr 25 01:21:33 2024 -0700

    Refactor GlueCatalog `_commit_table` (#653)
    
    * refactor _commit_table
    
    * small refactor
    
    * extract common logic of _commit_table
    
    * reformat
---
 pyiceberg/catalog/__init__.py          | 22 ++++++++++
 pyiceberg/catalog/glue.py              | 79 +++++++++++++++-------------------
 tests/catalog/integration_test_glue.py | 14 ++++--
 tests/catalog/test_glue.py             | 17 ++++++--
 4 files changed, 82 insertions(+), 50 deletions(-)

diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py
index f104aa94..18d803fe 100644
--- a/pyiceberg/catalog/__init__.py
+++ b/pyiceberg/catalog/__init__.py
@@ -48,6 +48,7 @@ from pyiceberg.table import (
     CreateTableTransaction,
     StagedTable,
     Table,
+    update_table_metadata,
 )
 from pyiceberg.table.metadata import TableMetadata, TableMetadataV1, 
new_table_metadata
 from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
@@ -728,6 +729,27 @@ class MetastoreCatalog(Catalog, ABC):
             catalog=self,
         )
 
+    def _update_and_stage_table(self, current_table: Optional[Table], 
table_request: CommitTableRequest) -> StagedTable:
+        for requirement in table_request.requirements:
+            requirement.validate(current_table.metadata if current_table else 
None)
+
+        updated_metadata = update_table_metadata(
+            base_metadata=current_table.metadata if current_table else 
self._empty_table_metadata(),
+            updates=table_request.updates,
+            enforce_validation=current_table is None,
+        )
+
+        new_metadata_version = 
self._parse_metadata_version(current_table.metadata_location) + 1 if 
current_table else 0
+        new_metadata_location = 
self._get_metadata_location(updated_metadata.location, new_metadata_version)
+
+        return StagedTable(
+            identifier=tuple(table_request.identifier.namespace.root + 
[table_request.identifier.name]),
+            metadata=updated_metadata,
+            metadata_location=new_metadata_location,
+            io=self._load_file_io(properties=updated_metadata.properties, 
location=new_metadata_location),
+            catalog=self,
+        )
+
     def _get_updated_props_and_update_summary(
         self, current_properties: Properties, removals: Optional[Set[str]], 
updates: Properties
     ) -> Tuple[PropertiesUpdateSummary, Properties]:
diff --git a/pyiceberg/catalog/glue.py b/pyiceberg/catalog/glue.py
index c3c2fdaf..275cda7e 100644
--- a/pyiceberg/catalog/glue.py
+++ b/pyiceberg/catalog/glue.py
@@ -58,7 +58,6 @@ from pyiceberg.exceptions import (
     NoSuchTableError,
     TableAlreadyExistsError,
 )
-from pyiceberg.io import load_file_io
 from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
 from pyiceberg.schema import Schema, SchemaVisitor, visit
 from pyiceberg.serializers import FromInputFile
@@ -67,7 +66,6 @@ from pyiceberg.table import (
     CommitTableResponse,
     PropertyUtil,
     Table,
-    update_table_metadata,
 )
 from pyiceberg.table.metadata import TableMetadata
 from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
@@ -321,7 +319,7 @@ class GlueCatalog(MetastoreCatalog):
             )
         metadata_location = properties[METADATA_LOCATION]
 
-        io = load_file_io(properties=self.properties, 
location=metadata_location)
+        io = self._load_file_io(location=metadata_location)
         file = io.new_input(metadata_location)
         metadata = FromInputFile.table_metadata(file)
         return Table(
@@ -439,71 +437,64 @@ class GlueCatalog(MetastoreCatalog):
         )
         database_name, table_name = 
self.identifier_to_database_and_table(identifier_tuple)
 
+        current_glue_table: Optional[TableTypeDef]
+        glue_table_version_id: Optional[str]
+        current_table: Optional[Table]
         try:
             current_glue_table = 
self._get_glue_table(database_name=database_name, table_name=table_name)
-            # Update the table
             glue_table_version_id = current_glue_table.get("VersionId")
+            current_table = 
self._convert_glue_to_iceberg(glue_table=current_glue_table)
+        except NoSuchTableError:
+            current_glue_table = None
+            glue_table_version_id = None
+            current_table = None
+
+        updated_staged_table = self._update_and_stage_table(current_table, 
table_request)
+        if current_table and updated_staged_table.metadata == 
current_table.metadata:
+            # no changes, do nothing
+            return CommitTableResponse(metadata=current_table.metadata, 
metadata_location=current_table.metadata_location)
+        self._write_metadata(
+            metadata=updated_staged_table.metadata,
+            io=updated_staged_table.io,
+            metadata_path=updated_staged_table.metadata_location,
+        )
+
+        if current_table:
+            # table exists, update the table
             if not glue_table_version_id:
                 raise CommitFailedException(
                     f"Cannot commit {database_name}.{table_name} because Glue 
table version id is missing"
                 )
-            current_table = 
self._convert_glue_to_iceberg(glue_table=current_glue_table)
-            base_metadata = current_table.metadata
-
-            # Validate the update requirements
-            for requirement in table_request.requirements:
-                requirement.validate(base_metadata)
-
-            updated_metadata = 
update_table_metadata(base_metadata=base_metadata, 
updates=table_request.updates)
-            if updated_metadata == base_metadata:
-                # no changes, do nothing
-                return CommitTableResponse(metadata=base_metadata, 
metadata_location=current_table.metadata_location)
-
-            # write new metadata
-            new_metadata_version = 
self._parse_metadata_version(current_table.metadata_location) + 1
-            new_metadata_location = 
self._get_metadata_location(current_table.metadata.location, 
new_metadata_version)
-            self._write_metadata(updated_metadata, current_table.io, 
new_metadata_location)
 
+            # Pass `version_id` to implement optimistic locking: it ensures 
updates are rejected if concurrent
+            # modifications occur. See more details at 
https://iceberg.apache.org/docs/latest/aws/#optimistic-locking
             update_table_input = _construct_table_input(
                 table_name=table_name,
-                metadata_location=new_metadata_location,
-                properties=current_table.properties,
-                metadata=updated_metadata,
+                metadata_location=updated_staged_table.metadata_location,
+                properties=updated_staged_table.properties,
+                metadata=updated_staged_table.metadata,
                 glue_table=current_glue_table,
                 prev_metadata_location=current_table.metadata_location,
             )
-
-            # Pass `version_id` to implement optimistic locking: it ensures 
updates are rejected if concurrent
-            # modifications occur. See more details at 
https://iceberg.apache.org/docs/latest/aws/#optimistic-locking
             self._update_glue_table(
                 database_name=database_name,
                 table_name=table_name,
                 table_input=update_table_input,
                 version_id=glue_table_version_id,
             )
-
-            return CommitTableResponse(metadata=updated_metadata, 
metadata_location=new_metadata_location)
-        except NoSuchTableError:
-            # Create the table
-            updated_metadata = update_table_metadata(
-                base_metadata=self._empty_table_metadata(), 
updates=table_request.updates, enforce_validation=True
-            )
-            new_metadata_version = 0
-            new_metadata_location = 
self._get_metadata_location(updated_metadata.location, new_metadata_version)
-            self._write_metadata(
-                updated_metadata, 
self._load_file_io(updated_metadata.properties, new_metadata_location), 
new_metadata_location
-            )
-
+        else:
+            # table does not exist, create the table
             create_table_input = _construct_table_input(
                 table_name=table_name,
-                metadata_location=new_metadata_location,
-                properties=updated_metadata.properties,
-                metadata=updated_metadata,
+                metadata_location=updated_staged_table.metadata_location,
+                properties=updated_staged_table.properties,
+                metadata=updated_staged_table.metadata,
             )
-
             self._create_glue_table(database_name=database_name, 
table_name=table_name, table_input=create_table_input)
 
-            return CommitTableResponse(metadata=updated_metadata, 
metadata_location=new_metadata_location)
+        return CommitTableResponse(
+            metadata=updated_staged_table.metadata, 
metadata_location=updated_staged_table.metadata_location
+        )
 
     def load_table(self, identifier: Union[str, Identifier]) -> Table:
         """Load the table's metadata and returns the table instance.
diff --git a/tests/catalog/integration_test_glue.py 
b/tests/catalog/integration_test_glue.py
index 5cd60225..393271c6 100644
--- a/tests/catalog/integration_test_glue.py
+++ b/tests/catalog/integration_test_glue.py
@@ -462,7 +462,9 @@ def test_commit_table_update_schema(
     ]
 
 
-def test_commit_table_properties(test_catalog: Catalog, table_schema_nested: 
Schema, database_name: str, table_name: str) -> None:
+def test_commit_table_properties(
+    test_catalog: Catalog, glue: boto3.client, table_schema_nested: Schema, 
database_name: str, table_name: str
+) -> None:
     identifier = (database_name, table_name)
     test_catalog.create_namespace(namespace=database_name)
     table = test_catalog.create_table(identifier=identifier, 
schema=table_schema_nested, properties={"test_a": "test_a"})
@@ -470,13 +472,19 @@ def test_commit_table_properties(test_catalog: Catalog, 
table_schema_nested: Sch
     assert MetastoreCatalog._parse_metadata_version(table.metadata_location) 
== 0
 
     transaction = table.transaction()
-    transaction.set_properties(test_a="test_aa", test_b="test_b", 
test_c="test_c")
+    transaction.set_properties(test_a="test_aa", test_b="test_b", 
test_c="test_c", Description="test_description")
     transaction.remove_properties("test_b")
     transaction.commit_transaction()
 
     updated_table_metadata = table.metadata
     assert MetastoreCatalog._parse_metadata_version(table.metadata_location) 
== 1
-    assert updated_table_metadata.properties == {"test_a": "test_aa", 
"test_c": "test_c"}
+    assert updated_table_metadata.properties == {'Description': 
'test_description', "test_a": "test_aa", "test_c": "test_c"}
+
+    table_info = glue.get_table(
+        DatabaseName=database_name,
+        Name=table_name,
+    )
+    assert table_info["Table"]["Description"] == "test_description"
 
 
 @pytest.mark.parametrize("format_version", [1, 2])
diff --git a/tests/catalog/test_glue.py b/tests/catalog/test_glue.py
index 8aa49186..7b12261b 100644
--- a/tests/catalog/test_glue.py
+++ b/tests/catalog/test_glue.py
@@ -677,7 +677,12 @@ def test_commit_table_update_schema(
 
 @mock_aws
 def test_commit_table_properties(
-    _bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: 
Schema, database_name: str, table_name: str
+    _glue: boto3.client,
+    _bucket_initialize: None,
+    moto_endpoint_url: str,
+    table_schema_nested: Schema,
+    database_name: str,
+    table_name: str,
 ) -> None:
     catalog_name = "glue"
     identifier = (database_name, table_name)
@@ -688,13 +693,19 @@ def test_commit_table_properties(
     assert test_catalog._parse_metadata_version(table.metadata_location) == 0
 
     transaction = table.transaction()
-    transaction.set_properties(test_a="test_aa", test_b="test_b", 
test_c="test_c")
+    transaction.set_properties(test_a="test_aa", test_b="test_b", 
test_c="test_c", Description="test_description")
     transaction.remove_properties("test_b")
     transaction.commit_transaction()
 
     updated_table_metadata = table.metadata
     assert test_catalog._parse_metadata_version(table.metadata_location) == 1
-    assert updated_table_metadata.properties == {"test_a": "test_aa", 
"test_c": "test_c"}
+    assert updated_table_metadata.properties == {'Description': 
'test_description', "test_a": "test_aa", "test_c": "test_c"}
+
+    table_info = _glue.get_table(
+        DatabaseName=database_name,
+        Name=table_name,
+    )
+    assert table_info["Table"]["Description"] == "test_description"
 
 
 @mock_aws

Reply via email to