This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 6020f24a Add transaction tests to catalog integration tests (#2371)
6020f24a is described below

commit 6020f24a7989808d382ccc3864205f9f1272bdd7
Author: Gabriel Igliozzi <[email protected]>
AuthorDate: Thu Oct 9 15:06:54 2025 -0400

    Add transaction tests to catalog integration tests (#2371)
    
    Just adding more tests to the catalog tests.
---
 tests/integration/test_catalog.py | 148 +++++++++++++++++++++++++++++++++++++-
 1 file changed, 147 insertions(+), 1 deletion(-)

diff --git a/tests/integration/test_catalog.py 
b/tests/integration/test_catalog.py
index b57ab983..b7d2cefa 100644
--- a/tests/integration/test_catalog.py
+++ b/tests/integration/test_catalog.py
@@ -27,6 +27,7 @@ from pyiceberg.catalog.memory import InMemoryCatalog
 from pyiceberg.catalog.rest import RestCatalog
 from pyiceberg.catalog.sql import SqlCatalog
 from pyiceberg.exceptions import (
+    CommitFailedException,
     NamespaceAlreadyExistsError,
     NamespaceNotEmptyError,
     NoSuchNamespaceError,
@@ -34,7 +35,12 @@ from pyiceberg.exceptions import (
     TableAlreadyExistsError,
 )
 from pyiceberg.io import WAREHOUSE
-from pyiceberg.schema import Schema
+from pyiceberg.partitioning import PartitionField, PartitionSpec
+from pyiceberg.schema import INITIAL_SCHEMA_ID, Schema
+from pyiceberg.table.metadata import INITIAL_SPEC_ID
+from pyiceberg.table.sorting import INITIAL_SORT_ORDER_ID, SortField, SortOrder
+from pyiceberg.transforms import DayTransform, IdentityTransform
+from pyiceberg.types import IntegerType, LongType, NestedField, TimestampType, 
UUIDType
 from tests.conftest import clean_up
 
 
@@ -259,6 +265,146 @@ def test_table_exists(test_catalog: Catalog, 
table_schema_nested: Schema, databa
     assert test_catalog.table_exists((database_name, table_name)) is True
 
 
[email protected]
[email protected]("test_catalog", CATALOGS)
+def test_update_table_transaction(test_catalog: Catalog, test_schema: Schema, 
table_name: str, database_name: str) -> None:
+    identifier = (database_name, table_name)
+
+    test_catalog.create_namespace(database_name)
+    table = test_catalog.create_table(identifier, test_schema)
+    assert test_catalog.table_exists(identifier)
+
+    expected_schema = Schema(
+        NestedField(1, "VendorID", IntegerType(), False),
+        NestedField(2, "tpep_pickup_datetime", TimestampType(), False),
+        NestedField(3, "new_col", IntegerType(), False),
+    )
+
+    expected_spec = PartitionSpec(PartitionField(3, 1000, IdentityTransform(), 
"new_col"))
+
+    with table.transaction() as transaction:
+        with transaction.update_schema() as update_schema:
+            update_schema.add_column("new_col", IntegerType())
+
+        with transaction.update_spec() as update_spec:
+            update_spec.add_field("new_col", IdentityTransform())
+
+    table = test_catalog.load_table(identifier)
+    assert table.schema().as_struct() == expected_schema.as_struct()
+    assert table.spec().fields == expected_spec.fields
+
+
[email protected]
[email protected]("test_catalog", CATALOGS)
+def test_update_schema_conflict(test_catalog: Catalog, test_schema: Schema, 
table_name: str, database_name: str) -> None:
+    if isinstance(test_catalog, HiveCatalog):
+        pytest.skip("HiveCatalog fails in this test, need to investigate")
+
+    identifier = (database_name, table_name)
+
+    test_catalog.create_namespace(database_name)
+    table = test_catalog.create_table(identifier, test_schema)
+    assert test_catalog.table_exists(identifier)
+
+    original_update = table.update_schema().add_column("new_col", LongType())
+
+    # Update schema concurrently so that the original update fails
+    concurrent_update = 
test_catalog.load_table(identifier).update_schema().delete_column("VendorID")
+    concurrent_update.commit()
+
+    expected_schema = Schema(NestedField(2, "tpep_pickup_datetime", 
TimestampType(), False))
+
+    with pytest.raises(CommitFailedException):
+        original_update.commit()
+
+    table = test_catalog.load_table(identifier)
+    assert table.schema().as_struct() == expected_schema.as_struct()
+
+
[email protected]
[email protected]("test_catalog", CATALOGS)
+def test_create_table_transaction_simple(test_catalog: Catalog, test_schema: 
Schema, table_name: str, database_name: str) -> None:
+    identifier = (database_name, table_name)
+
+    test_catalog.create_namespace(database_name)
+    table_transaction = test_catalog.create_table_transaction(identifier, 
test_schema)
+    assert not test_catalog.table_exists(identifier)
+
+    table_transaction.update_schema().add_column("new_col", 
IntegerType()).commit()
+    assert not test_catalog.table_exists(identifier)
+
+    table_transaction.commit_transaction()
+    assert test_catalog.table_exists(identifier)
+
+    table = test_catalog.load_table(identifier)
+    assert table.schema().find_type("new_col").is_primitive
+
+
[email protected]
[email protected]("test_catalog", CATALOGS)
+def test_create_table_transaction_multiple_schemas(
+    test_catalog: Catalog, test_schema: Schema, test_partition_spec: 
PartitionSpec, table_name: str, database_name: str
+) -> None:
+    identifier = (database_name, table_name)
+
+    test_catalog.create_namespace(database_name)
+    table_transaction = test_catalog.create_table_transaction(
+        identifier=identifier,
+        schema=test_schema,
+        partition_spec=test_partition_spec,
+        sort_order=SortOrder(SortField(source_id=1)),
+    )
+    assert not test_catalog.table_exists(identifier)
+
+    table_transaction.update_schema().add_column("new_col", 
IntegerType()).commit()
+    assert not test_catalog.table_exists(identifier)
+
+    table_transaction.update_schema().add_column("new_col_1", 
UUIDType()).commit()
+    assert not test_catalog.table_exists(identifier)
+
+    table_transaction.update_spec().add_field("new_col", 
IdentityTransform()).commit()
+    assert not test_catalog.table_exists(identifier)
+
+    # TODO: test replace sort order when available
+
+    expected_schema = Schema(
+        NestedField(1, "VendorID", IntegerType(), False),
+        NestedField(2, "tpep_pickup_datetime", TimestampType(), False),
+        NestedField(3, "new_col", IntegerType(), False),
+        NestedField(4, "new_col_1", UUIDType(), False),
+    )
+
+    expected_spec = PartitionSpec(
+        PartitionField(1, 1000, IdentityTransform(), "VendorID"),
+        PartitionField(2, 1001, DayTransform(), "tpep_pickup_day"),
+        PartitionField(3, 1002, IdentityTransform(), "new_col"),
+    )
+
+    table_transaction.commit_transaction()
+    assert test_catalog.table_exists(identifier)
+
+    table = test_catalog.load_table(identifier)
+    assert table.schema().as_struct() == expected_schema.as_struct()
+    assert table.schema().schema_id == INITIAL_SCHEMA_ID + 2
+    assert table.spec().fields == expected_spec.fields
+    assert table.spec().spec_id == INITIAL_SPEC_ID + 1
+    assert table.sort_order().order_id == INITIAL_SORT_ORDER_ID
+
+
[email protected]
[email protected]("test_catalog", CATALOGS)
+def test_concurrent_create_transaction(test_catalog: Catalog, test_schema: 
Schema, table_name: str, database_name: str) -> None:
+    identifier = (database_name, table_name)
+
+    test_catalog.create_namespace(database_name)
+    table = test_catalog.create_table_transaction(identifier=identifier, 
schema=test_schema)
+    assert not test_catalog.table_exists(identifier)
+
+    test_catalog.create_table(identifier, test_schema)
+    with pytest.raises(CommitFailedException):
+        table.commit_transaction()
+
+
 @pytest.mark.integration
 @pytest.mark.parametrize("test_catalog", CATALOGS)
 def test_create_namespace(test_catalog: Catalog, database_name: str) -> None:

Reply via email to