This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new b3a8027f Consolidating InMemoryCatalog and SqlCatalog tests (#2906)
b3a8027f is described below

commit b3a8027f037c41dd3bd68f15756feeee3b9b4a3c
Author: Neelesh Salian <[email protected]>
AuthorDate: Tue Jan 13 13:59:29 2026 -0800

    Consolidating InMemoryCatalog and SqlCatalog tests (#2906)
    
    Closes #2870
    
    # Rationale for this change
    
    Eliminates code duplication between `test_base.py` and `test_sql.py` by
    consolidating shared catalog behavior tests into a single parameterized
    test suite. This ensures consistent behavior testing across both catalog
    implementations and makes it easier to add new catalog implementations
    with shared test coverage.
    
    ## Changes
    - New `tests/catalog/test_catalog_behaviors.py` with parameterized tests
    covering both catalogs
    - Added catalog fixtures to `tests/conftest.py`
    - Reduced `test_base.py` to InMemoryCatalog-specific tests
    - Reduced `test_sql.py` to SqlCatalog-specific tests
    - Fixed return type annotation for `test_partition_spec` fixture in
    `tests/conftest.py`
    - Removed an import from `tests/table/test_upsert.py`
    
    ## Are these changes tested?
    
    `make test` ran successfully locally
    
    ## Are there any user-facing changes?
    
    No
    
    <!-- In the case of user-facing changes, please add the changelog label.
    -->
---
 tests/catalog/test_base.py              |  578 +-----------
 tests/catalog/test_catalog_behaviors.py | 1192 ++++++++++++++++++++++++
 tests/catalog/test_sql.py               | 1509 +------------------------------
 tests/conftest.py                       |  173 +++-
 tests/table/test_upsert.py              |    4 +-
 5 files changed, 1360 insertions(+), 2096 deletions(-)

diff --git a/tests/catalog/test_base.py b/tests/catalog/test_base.py
index d91bfcdb..1a474783 100644
--- a/tests/catalog/test_base.py
+++ b/tests/catalog/test_base.py
@@ -20,35 +20,13 @@
 from collections.abc import Generator
 from pathlib import PosixPath
 
-import pyarrow as pa
 import pytest
-from pydantic_core import ValidationError
-from pytest_lazyfixture import lazy_fixture
 
 from pyiceberg.catalog import Catalog, load_catalog
 from pyiceberg.catalog.memory import InMemoryCatalog
-from pyiceberg.exceptions import (
-    NamespaceAlreadyExistsError,
-    NamespaceNotEmptyError,
-    NoSuchNamespaceError,
-    NoSuchTableError,
-    TableAlreadyExistsError,
-)
 from pyiceberg.io import WAREHOUSE
-from pyiceberg.io.pyarrow import schema_to_pyarrow
-from pyiceberg.partitioning import PartitionField, PartitionSpec
 from pyiceberg.schema import Schema
-from pyiceberg.table import (
-    Table,
-    TableProperties,
-)
-from pyiceberg.table.update import (
-    AddSchemaUpdate,
-    SetCurrentSchemaUpdate,
-)
-from pyiceberg.transforms import IdentityTransform
-from pyiceberg.typedef import EMPTY_DICT, Properties
-from pyiceberg.types import IntegerType, LongType, NestedField, StringType
+from pyiceberg.types import NestedField, StringType
 
 
 @pytest.fixture
@@ -58,38 +36,6 @@ def catalog(tmp_path: PosixPath) -> Generator[Catalog, None, 
None]:
     catalog.close()
 
 
-TEST_TABLE_IDENTIFIER = ("com", "organization", "department", "my_table")
-TEST_TABLE_NAMESPACE = ("com", "organization", "department")
-TEST_TABLE_NAME = "my_table"
-TEST_TABLE_SCHEMA = Schema(
-    NestedField(1, "x", LongType(), required=True),
-    NestedField(2, "y", LongType(), doc="comment", required=True),
-    NestedField(3, "z", LongType(), required=True),
-)
-TEST_TABLE_PARTITION_SPEC = PartitionSpec(PartitionField(name="x", 
transform=IdentityTransform(), source_id=1, field_id=1000))
-TEST_TABLE_PROPERTIES = {"key1": "value1", "key2": "value2"}
-NO_SUCH_TABLE_ERROR = "Table does not exist: 
com.organization.department.my_table"
-TABLE_ALREADY_EXISTS_ERROR = "Table com.organization.department.my_table 
already exists"
-NAMESPACE_ALREADY_EXISTS_ERROR = "Namespace \\('com', 'organization', 
'department'\\) already exists"
-# TODO: consolidate namespace error messages then remove this
-DROP_NOT_EXISTING_NAMESPACE_ERROR = "Namespace does not exist: \\('com', 
'organization', 'department'\\)"
-NO_SUCH_NAMESPACE_ERROR = "Namespace com.organization.department does not 
exists"
-NAMESPACE_NOT_EMPTY_ERROR = "Namespace com.organization.department is not 
empty"
-
-
-def given_catalog_has_a_table(
-    catalog: InMemoryCatalog,
-    properties: Properties = EMPTY_DICT,
-) -> Table:
-    catalog.create_namespace(TEST_TABLE_NAMESPACE)
-    return catalog.create_table(
-        identifier=TEST_TABLE_IDENTIFIER,
-        schema=TEST_TABLE_SCHEMA,
-        partition_spec=TEST_TABLE_PARTITION_SPEC,
-        properties=properties or TEST_TABLE_PROPERTIES,
-    )
-
-
 def test_load_catalog_in_memory() -> None:
     assert load_catalog("catalog", type="in-memory")
 
@@ -118,533 +64,11 @@ def test_load_catalog_has_type_and_impl() -> None:
     )
 
 
-def test_namespace_from_tuple() -> None:
-    # Given
-    identifier = ("com", "organization", "department", "my_table")
-    # When
-    namespace_from = Catalog.namespace_from(identifier)
-    # Then
-    assert namespace_from == ("com", "organization", "department")
-
-
-def test_namespace_from_str() -> None:
-    # Given
-    identifier = "com.organization.department.my_table"
-    # When
-    namespace_from = Catalog.namespace_from(identifier)
-    # Then
-    assert namespace_from == ("com", "organization", "department")
-
-
-def test_name_from_tuple() -> None:
-    # Given
-    identifier = ("com", "organization", "department", "my_table")
-    # When
-    name_from = Catalog.table_name_from(identifier)
-    # Then
-    assert name_from == "my_table"
-
-
-def test_name_from_str() -> None:
-    # Given
-    identifier = "com.organization.department.my_table"
-    # When
-    name_from = Catalog.table_name_from(identifier)
-    # Then
-    assert name_from == "my_table"
-
-
-def test_create_table(catalog: InMemoryCatalog) -> None:
-    catalog.create_namespace(TEST_TABLE_NAMESPACE)
-    table = catalog.create_table(
-        identifier=TEST_TABLE_IDENTIFIER,
-        schema=TEST_TABLE_SCHEMA,
-        partition_spec=TEST_TABLE_PARTITION_SPEC,
-        properties=TEST_TABLE_PROPERTIES,
-    )
-    assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table
-
-
-def test_create_table_location_override(catalog: InMemoryCatalog) -> None:
-    new_location = f"{catalog._warehouse_location}/new_location"
-    catalog.create_namespace(TEST_TABLE_NAMESPACE)
-    table = catalog.create_table(
-        identifier=TEST_TABLE_IDENTIFIER,
-        schema=TEST_TABLE_SCHEMA,
-        location=new_location,
-        partition_spec=TEST_TABLE_PARTITION_SPEC,
-        properties=TEST_TABLE_PROPERTIES,
-    )
-    assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table
-    assert table.location() == new_location
-
-
-def test_create_table_removes_trailing_slash_from_location(catalog: 
InMemoryCatalog) -> None:
-    new_location = f"{catalog._warehouse_location}/new_location"
-    catalog.create_namespace(TEST_TABLE_NAMESPACE)
-    table = catalog.create_table(
-        identifier=TEST_TABLE_IDENTIFIER,
-        schema=TEST_TABLE_SCHEMA,
-        location=f"{new_location}/",
-        partition_spec=TEST_TABLE_PARTITION_SPEC,
-        properties=TEST_TABLE_PROPERTIES,
-    )
-    assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table
-    assert table.location() == new_location
-
-
[email protected](
-    "schema,expected",
-    [
-        (lazy_fixture("pyarrow_schema_simple_without_ids"), 
lazy_fixture("iceberg_schema_simple_no_ids")),
-        (lazy_fixture("iceberg_schema_simple"), 
lazy_fixture("iceberg_schema_simple")),
-        (lazy_fixture("iceberg_schema_nested"), 
lazy_fixture("iceberg_schema_nested")),
-        (lazy_fixture("pyarrow_schema_nested_without_ids"), 
lazy_fixture("iceberg_schema_nested_no_ids")),
-    ],
-)
-def test_convert_schema_if_needed(
-    schema: Schema | pa.Schema,
-    expected: Schema,
-    catalog: InMemoryCatalog,
-) -> None:
-    assert expected == catalog._convert_schema_if_needed(schema)
-
-
-def test_convert_schema_if_needed_rejects_null_type(catalog: InMemoryCatalog) 
-> None:
-    schema = pa.schema([pa.field("n1", pa.null())])
-    with pytest.raises(ValueError) as exc_info:
-        catalog._convert_schema_if_needed(schema)
-    message = str(exc_info.value)
-    assert "Null type" in message
-    assert "n1" in message
-    assert "format-version=3" in message
-
-
-def test_create_table_pyarrow_schema(catalog: InMemoryCatalog, 
pyarrow_schema_simple_without_ids: pa.Schema) -> None:
-    catalog.create_namespace(TEST_TABLE_NAMESPACE)
-    table = catalog.create_table(
-        identifier=TEST_TABLE_IDENTIFIER,
-        schema=pyarrow_schema_simple_without_ids,
-        properties=TEST_TABLE_PROPERTIES,
-    )
-    assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table
-
-
-def test_create_table_raises_error_when_table_already_exists(catalog: 
InMemoryCatalog) -> None:
-    # Given
-    given_catalog_has_a_table(catalog)
-    # When
-    with pytest.raises(TableAlreadyExistsError, 
match=TABLE_ALREADY_EXISTS_ERROR):
-        catalog.create_table(
-            identifier=TEST_TABLE_IDENTIFIER,
-            schema=TEST_TABLE_SCHEMA,
-        )
-
-
-def test_load_table(catalog: InMemoryCatalog) -> None:
-    # Given
-    given_table = given_catalog_has_a_table(catalog)
-    # When
-    table = catalog.load_table(TEST_TABLE_IDENTIFIER)
-    # Then
-    assert table == given_table
-
-
-def test_load_table_from_self_identifier(catalog: InMemoryCatalog) -> None:
-    # Given
-    given_table = given_catalog_has_a_table(catalog)
-    # When
-    intermediate = catalog.load_table(TEST_TABLE_IDENTIFIER)
-    table = catalog.load_table(intermediate._identifier)
-    # Then
-    assert table == given_table
-
-
-def test_table_raises_error_on_table_not_found(catalog: InMemoryCatalog) -> 
None:
-    with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
-        catalog.load_table(TEST_TABLE_IDENTIFIER)
-
-
-def test_table_exists(catalog: InMemoryCatalog) -> None:
-    # Given
-    given_catalog_has_a_table(catalog)
-    # Then
-    assert catalog.table_exists(TEST_TABLE_IDENTIFIER)
-
-
-def test_table_exists_on_table_not_found(catalog: InMemoryCatalog) -> None:
-    assert not catalog.table_exists(TEST_TABLE_IDENTIFIER)
-
-
-def test_drop_table(catalog: InMemoryCatalog) -> None:
-    # Given
-    given_catalog_has_a_table(catalog)
-    # When
-    catalog.drop_table(TEST_TABLE_IDENTIFIER)
-    # Then
-    with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
-        catalog.load_table(TEST_TABLE_IDENTIFIER)
-
-
-def test_drop_table_from_self_identifier(catalog: InMemoryCatalog) -> None:
-    # Given
-    table = given_catalog_has_a_table(catalog)
-    # When
-    catalog.drop_table(table._identifier)
-    # Then
-    with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
-        catalog.load_table(table._identifier)
-    with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
-        catalog.load_table(TEST_TABLE_IDENTIFIER)
-
-
-def test_drop_table_that_does_not_exist_raise_error(catalog: InMemoryCatalog) 
-> None:
-    with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
-        catalog.load_table(TEST_TABLE_IDENTIFIER)
-
-
-def test_purge_table(catalog: InMemoryCatalog) -> None:
-    # Given
-    given_catalog_has_a_table(catalog)
-    # When
-    catalog.purge_table(TEST_TABLE_IDENTIFIER)
-    # Then
-    with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
-        catalog.load_table(TEST_TABLE_IDENTIFIER)
-
-
-def test_rename_table(catalog: InMemoryCatalog) -> None:
-    # Given
-    given_catalog_has_a_table(catalog)
-
-    # When
-    new_table = "new.namespace.new_table"
-    catalog.create_namespace(("new", "namespace"))
-    table = catalog.rename_table(TEST_TABLE_IDENTIFIER, new_table)
-
-    # Then
-    assert table._identifier == Catalog.identifier_to_tuple(new_table)
-
-    # And
-    table = catalog.load_table(new_table)
-    assert table._identifier == Catalog.identifier_to_tuple(new_table)
-
-    # And
-    assert catalog._namespace_exists(table._identifier[:-1])
-
-    # And
-    with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
-        catalog.load_table(TEST_TABLE_IDENTIFIER)
-
-
-def test_rename_table_from_self_identifier(catalog: InMemoryCatalog) -> None:
-    # Given
-    table = given_catalog_has_a_table(catalog)
-
-    # When
-    new_table_name = "new.namespace.new_table"
-    catalog.create_namespace(("new", "namespace"))
-    new_table = catalog.rename_table(table._identifier, new_table_name)
-
-    # Then
-    assert new_table._identifier == Catalog.identifier_to_tuple(new_table_name)
-
-    # And
-    new_table = catalog.load_table(new_table._identifier)
-    assert new_table._identifier == Catalog.identifier_to_tuple(new_table_name)
-
-    # And
-    assert catalog._namespace_exists(new_table._identifier[:-1])
-
-    # And
-    with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
-        catalog.load_table(table._identifier)
-    with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
-        catalog.load_table(TEST_TABLE_IDENTIFIER)
-
-
-def test_create_namespace(catalog: InMemoryCatalog) -> None:
-    # When
-    catalog.create_namespace(TEST_TABLE_NAMESPACE, TEST_TABLE_PROPERTIES)
-
-    # Then
-    assert catalog._namespace_exists(TEST_TABLE_NAMESPACE)
-    assert TEST_TABLE_PROPERTIES == 
catalog.load_namespace_properties(TEST_TABLE_NAMESPACE)
-
-
-def test_create_namespace_raises_error_on_existing_namespace(catalog: 
InMemoryCatalog) -> None:
-    # Given
-    catalog.create_namespace(TEST_TABLE_NAMESPACE, TEST_TABLE_PROPERTIES)
-    # When
-    with pytest.raises(NamespaceAlreadyExistsError, 
match=NAMESPACE_ALREADY_EXISTS_ERROR):
-        catalog.create_namespace(TEST_TABLE_NAMESPACE, TEST_TABLE_PROPERTIES)
-
-
-def 
test_get_namespace_metadata_raises_error_when_namespace_does_not_exist(catalog: 
InMemoryCatalog) -> None:
-    with pytest.raises(NoSuchNamespaceError, match=NO_SUCH_NAMESPACE_ERROR):
-        catalog.load_namespace_properties(TEST_TABLE_NAMESPACE)
-
-
-def test_list_namespaces(catalog: InMemoryCatalog) -> None:
-    # Given
-    catalog.create_namespace(TEST_TABLE_NAMESPACE, TEST_TABLE_PROPERTIES)
-    # When
-    namespaces = catalog.list_namespaces()
-    # Then
-    assert TEST_TABLE_NAMESPACE[:1] in namespaces
-
-    # When
-    namespaces = catalog.list_namespaces(TEST_TABLE_NAMESPACE)
-    # Then
-    assert not namespaces
-
-
-def test_drop_namespace(catalog: InMemoryCatalog) -> None:
-    # Given
-    catalog.create_namespace(TEST_TABLE_NAMESPACE, TEST_TABLE_PROPERTIES)
-    # When
-    catalog.drop_namespace(TEST_TABLE_NAMESPACE)
-    # Then
-    assert not catalog._namespace_exists(TEST_TABLE_NAMESPACE)
-
-
-def test_drop_namespace_raises_error_when_namespace_does_not_exist(catalog: 
InMemoryCatalog) -> None:
-    with pytest.raises(NoSuchNamespaceError, 
match=DROP_NOT_EXISTING_NAMESPACE_ERROR):
-        catalog.drop_namespace(TEST_TABLE_NAMESPACE)
-
-
-def test_drop_namespace_raises_error_when_namespace_not_empty(catalog: 
InMemoryCatalog) -> None:
-    # Given
-    given_catalog_has_a_table(catalog)
-    # When
-    with pytest.raises(NamespaceNotEmptyError, 
match=NAMESPACE_NOT_EMPTY_ERROR):
-        catalog.drop_namespace(TEST_TABLE_NAMESPACE)
-
-
-def test_list_tables(catalog: InMemoryCatalog) -> None:
-    # Given
-    given_catalog_has_a_table(catalog)
-    # When
-    tables = catalog.list_tables(namespace=TEST_TABLE_NAMESPACE)
-    # Then
-    assert tables
-    assert TEST_TABLE_IDENTIFIER in tables
-
-
-def test_list_tables_under_a_namespace(catalog: InMemoryCatalog) -> None:
-    # Given
-    given_catalog_has_a_table(catalog)
-    new_namespace = ("new", "namespace")
-    catalog.create_namespace(new_namespace)
-    # When
-    all_tables = catalog.list_tables(namespace=TEST_TABLE_NAMESPACE)
-    new_namespace_tables = catalog.list_tables(new_namespace)
-    # Then
-    assert all_tables
-    assert TEST_TABLE_IDENTIFIER in all_tables
-    assert new_namespace_tables == []
-
-
-def test_update_namespace_metadata(catalog: InMemoryCatalog) -> None:
-    # Given
-    catalog.create_namespace(TEST_TABLE_NAMESPACE, TEST_TABLE_PROPERTIES)
-
-    # When
-    new_metadata = {"key3": "value3", "key4": "value4"}
-    summary = catalog.update_namespace_properties(TEST_TABLE_NAMESPACE, 
updates=new_metadata)
-
-    # Then
-    assert catalog._namespace_exists(TEST_TABLE_NAMESPACE)
-    assert new_metadata.items() <= 
catalog.load_namespace_properties(TEST_TABLE_NAMESPACE).items()
-    assert summary.removed == []
-    assert sorted(summary.updated) == ["key3", "key4"]
-    assert summary.missing == []
-
-
-def test_update_namespace_metadata_removals(catalog: InMemoryCatalog) -> None:
-    # Given
-    catalog.create_namespace(TEST_TABLE_NAMESPACE, TEST_TABLE_PROPERTIES)
-
-    # When
-    new_metadata = {"key3": "value3", "key4": "value4"}
-    remove_metadata = {"key1"}
-    summary = catalog.update_namespace_properties(TEST_TABLE_NAMESPACE, 
remove_metadata, new_metadata)
-
-    # Then
-    assert catalog._namespace_exists(TEST_TABLE_NAMESPACE)
-    assert new_metadata.items() <= 
catalog.load_namespace_properties(TEST_TABLE_NAMESPACE).items()
-    assert 
remove_metadata.isdisjoint(catalog.load_namespace_properties(TEST_TABLE_NAMESPACE).keys())
-    assert summary.removed == ["key1"]
-    assert sorted(summary.updated) == ["key3", "key4"]
-    assert summary.missing == []
-
-
-def 
test_update_namespace_metadata_raises_error_when_namespace_does_not_exist(catalog:
 InMemoryCatalog) -> None:
-    with pytest.raises(NoSuchNamespaceError, match=NO_SUCH_NAMESPACE_ERROR):
-        catalog.update_namespace_properties(TEST_TABLE_NAMESPACE, 
updates=TEST_TABLE_PROPERTIES)
-
-
-def test_commit_table(catalog: InMemoryCatalog) -> None:
-    # Given
-    given_table = given_catalog_has_a_table(catalog)
-    new_schema = Schema(
-        NestedField(1, "x", LongType()),
-        NestedField(2, "y", LongType(), doc="comment"),
-        NestedField(3, "z", LongType()),
-        NestedField(4, "add", LongType()),
-    )
-
-    # When
-    response = given_table.catalog.commit_table(
-        given_table,
-        updates=(
-            AddSchemaUpdate(schema=new_schema),
-            SetCurrentSchemaUpdate(schema_id=-1),
-        ),
-        requirements=(),
-    )
-
-    # Then
-    assert response.metadata.table_uuid == given_table.metadata.table_uuid
-    assert len(response.metadata.schemas) == 2
-    assert response.metadata.schemas[1] == new_schema
-    assert response.metadata.current_schema_id == new_schema.schema_id
-
-
-def test_add_column(catalog: InMemoryCatalog) -> None:
-    given_table = given_catalog_has_a_table(catalog)
-
-    given_table.update_schema().add_column(path="new_column1", 
field_type=IntegerType()).commit()
-
-    assert given_table.schema() == Schema(
-        NestedField(field_id=1, name="x", field_type=LongType(), 
required=True),
-        NestedField(field_id=2, name="y", field_type=LongType(), 
required=True, doc="comment"),
-        NestedField(field_id=3, name="z", field_type=LongType(), 
required=True),
-        NestedField(field_id=4, name="new_column1", field_type=IntegerType(), 
required=False),
-        identifier_field_ids=[],
-    )
-    assert given_table.schema().schema_id == 1
-
-    transaction = given_table.transaction()
-    transaction.update_schema().add_column(path="new_column2", 
field_type=IntegerType(), doc="doc").commit()
-    transaction.commit_transaction()
-
-    assert given_table.schema() == Schema(
-        NestedField(field_id=1, name="x", field_type=LongType(), 
required=True),
-        NestedField(field_id=2, name="y", field_type=LongType(), 
required=True, doc="comment"),
-        NestedField(field_id=3, name="z", field_type=LongType(), 
required=True),
-        NestedField(field_id=4, name="new_column1", field_type=IntegerType(), 
required=False),
-        NestedField(field_id=5, name="new_column2", field_type=IntegerType(), 
required=False, doc="doc"),
-        identifier_field_ids=[],
-    )
-    assert given_table.schema().schema_id == 2
-
-
-def test_add_column_with_statement(catalog: InMemoryCatalog) -> None:
-    given_table = given_catalog_has_a_table(catalog)
-
-    with given_table.update_schema() as tx:
-        tx.add_column(path="new_column1", field_type=IntegerType())
-
-    assert given_table.schema() == Schema(
-        NestedField(field_id=1, name="x", field_type=LongType(), 
required=True),
-        NestedField(field_id=2, name="y", field_type=LongType(), 
required=True, doc="comment"),
-        NestedField(field_id=3, name="z", field_type=LongType(), 
required=True),
-        NestedField(field_id=4, name="new_column1", field_type=IntegerType(), 
required=False),
-        identifier_field_ids=[],
-    )
-    assert given_table.schema().schema_id == 1
-
-    with given_table.transaction() as tx:
-        tx.update_schema().add_column(path="new_column2", 
field_type=IntegerType(), doc="doc").commit()
-
-    assert given_table.schema() == Schema(
-        NestedField(field_id=1, name="x", field_type=LongType(), 
required=True),
-        NestedField(field_id=2, name="y", field_type=LongType(), 
required=True, doc="comment"),
-        NestedField(field_id=3, name="z", field_type=LongType(), 
required=True),
-        NestedField(field_id=4, name="new_column1", field_type=IntegerType(), 
required=False),
-        NestedField(field_id=5, name="new_column2", field_type=IntegerType(), 
required=False, doc="doc"),
-        identifier_field_ids=[],
-    )
-    assert given_table.schema().schema_id == 2
-
-
 def test_catalog_repr(catalog: InMemoryCatalog) -> None:
     s = repr(catalog)
     assert s == "test.in_memory.catalog (<class 
'pyiceberg.catalog.memory.InMemoryCatalog'>)"
 
 
-def test_table_properties_int_value(catalog: InMemoryCatalog) -> None:
-    # table properties can be set to int, but still serialized to string
-    property_with_int = {"property_name": 42}
-    given_table = given_catalog_has_a_table(catalog, 
properties=property_with_int)
-    assert isinstance(given_table.properties["property_name"], str)
-
-
-def test_table_properties_raise_for_none_value(catalog: InMemoryCatalog) -> 
None:
-    property_with_none = {"property_name": None}
-    with pytest.raises(ValidationError) as exc_info:
-        _ = given_catalog_has_a_table(catalog, properties=property_with_none)
-    assert "None type is not a supported value in properties: property_name" 
in str(exc_info.value)
-
-
-def test_table_writes_metadata_to_custom_location(catalog: InMemoryCatalog) -> 
None:
-    metadata_path = f"{catalog._warehouse_location}/custom/path"
-    catalog.create_namespace(TEST_TABLE_NAMESPACE)
-    table = catalog.create_table(
-        identifier=TEST_TABLE_IDENTIFIER,
-        schema=TEST_TABLE_SCHEMA,
-        partition_spec=TEST_TABLE_PARTITION_SPEC,
-        properties={TableProperties.WRITE_METADATA_PATH: metadata_path},
-    )
-    df = pa.Table.from_pylist([{"x": 123, "y": 456, "z": 789}], 
schema=schema_to_pyarrow(TEST_TABLE_SCHEMA))
-    table.append(df)
-    manifests = table.current_snapshot().manifests(table.io)  # type: ignore
-    location_provider = table.location_provider()
-
-    assert 
location_provider.new_metadata_location("").startswith(metadata_path)
-    assert manifests[0].manifest_path.startswith(metadata_path)
-    assert table.location() != metadata_path
-    assert table.metadata_location.startswith(metadata_path)
-
-
-def test_table_writes_metadata_to_default_path(catalog: InMemoryCatalog) -> 
None:
-    catalog.create_namespace(TEST_TABLE_NAMESPACE)
-    table = catalog.create_table(
-        identifier=TEST_TABLE_IDENTIFIER,
-        schema=TEST_TABLE_SCHEMA,
-        partition_spec=TEST_TABLE_PARTITION_SPEC,
-        properties=TEST_TABLE_PROPERTIES,
-    )
-    metadata_path = f"{table.location()}/metadata"
-    df = pa.Table.from_pylist([{"x": 123, "y": 456, "z": 789}], 
schema=schema_to_pyarrow(TEST_TABLE_SCHEMA))
-    table.append(df)
-    manifests = table.current_snapshot().manifests(table.io)  # type: ignore
-    location_provider = table.location_provider()
-
-    assert 
location_provider.new_metadata_location("").startswith(metadata_path)
-    assert manifests[0].manifest_path.startswith(metadata_path)
-    assert table.metadata_location.startswith(metadata_path)
-
-
-def test_table_metadata_writes_reflect_latest_path(catalog: InMemoryCatalog) 
-> None:
-    catalog.create_namespace(TEST_TABLE_NAMESPACE)
-    table = catalog.create_table(
-        identifier=TEST_TABLE_IDENTIFIER,
-        schema=TEST_TABLE_SCHEMA,
-        partition_spec=TEST_TABLE_PARTITION_SPEC,
-    )
-
-    initial_metadata_path = f"{table.location()}/metadata"
-    assert table.location_provider().new_metadata_location("metadata.json") == 
f"{initial_metadata_path}/metadata.json"
-
-    # update table with new path for metadata
-    new_metadata_path = f"{table.location()}/custom/path"
-    table.transaction().set_properties({TableProperties.WRITE_METADATA_PATH: 
new_metadata_path}).commit_transaction()
-
-    assert table.location_provider().new_metadata_location("metadata.json") == 
f"{new_metadata_path}/metadata.json"
-
-
 class TestCatalogClose:
     """Test catalog close functionality."""
 
diff --git a/tests/catalog/test_catalog_behaviors.py 
b/tests/catalog/test_catalog_behaviors.py
new file mode 100644
index 00000000..29689358
--- /dev/null
+++ b/tests/catalog/test_catalog_behaviors.py
@@ -0,0 +1,1192 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Consolidated behavior tests for InMemoryCatalog and SqlCatalog.
+"""
+
+import os
+from pathlib import Path
+from typing import Any
+
+import pyarrow as pa
+import pytest
+from pydantic_core import ValidationError
+from pytest_lazyfixture import lazy_fixture
+from sqlalchemy.exc import IntegrityError
+
+from pyiceberg.catalog import Catalog
+from pyiceberg.exceptions import (
+    CommitFailedException,
+    NamespaceAlreadyExistsError,
+    NamespaceNotEmptyError,
+    NoSuchNamespaceError,
+    NoSuchTableError,
+    TableAlreadyExistsError,
+)
+from pyiceberg.io.pyarrow import _dataframe_to_data_files, schema_to_pyarrow
+from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, 
PartitionField, PartitionSpec
+from pyiceberg.schema import Schema
+from pyiceberg.table import TableProperties
+from pyiceberg.table.snapshots import Operation
+from pyiceberg.table.sorting import NullOrder, SortDirection, SortField, 
SortOrder
+from pyiceberg.table.update import AddSchemaUpdate, SetCurrentSchemaUpdate
+from pyiceberg.transforms import IdentityTransform
+from pyiceberg.typedef import Identifier
+from pyiceberg.types import BooleanType, IntegerType, LongType, NestedField, 
StringType
+
+
+# Name parsing tests
+def test_namespace_from_tuple() -> None:
+    identifier = ("com", "organization", "department", "my_table")
+    namespace_from = Catalog.namespace_from(identifier)
+    assert namespace_from == ("com", "organization", "department")
+
+
+def test_namespace_from_str() -> None:
+    identifier = "com.organization.department.my_table"
+    namespace_from = Catalog.namespace_from(identifier)
+    assert namespace_from == ("com", "organization", "department")
+
+
+def test_name_from_tuple() -> None:
+    identifier = ("com", "organization", "department", "my_table")
+    name_from = Catalog.table_name_from(identifier)
+    assert name_from == "my_table"
+
+
+def test_name_from_str() -> None:
+    identifier = "com.organization.department.my_table"
+    name_from = Catalog.table_name_from(identifier)
+    assert name_from == "my_table"
+
+
+# Create table tests
+def test_create_table(catalog: Catalog, test_table_identifier: Identifier, 
table_schema_simple: Schema) -> None:
+    namespace = Catalog.namespace_from(test_table_identifier)
+    catalog.create_namespace(namespace)
+    table = catalog.create_table(test_table_identifier, table_schema_simple)
+    loaded = catalog.load_table(test_table_identifier)
+    assert loaded.name() == table.name()
+    assert loaded.metadata_location == table.metadata_location
+
+
+def test_create_table_if_not_exists_duplicated_table(
+    catalog: Catalog, table_schema_nested: Schema, test_table_identifier: 
Identifier
+) -> None:
+    namespace = Catalog.namespace_from(test_table_identifier)
+    catalog.create_namespace(namespace)
+    table1 = catalog.create_table(test_table_identifier, table_schema_nested)
+    table2 = catalog.create_table_if_not_exists(test_table_identifier, 
table_schema_nested)
+    assert table1.name() == table2.name()
+
+
+def test_create_table_raises_error_when_table_already_exists(
+    catalog: Catalog, test_table_identifier: Identifier, table_schema_nested: 
Schema
+) -> None:
+    namespace = Catalog.namespace_from(test_table_identifier)
+    catalog.create_namespace(namespace)
+    catalog.create_table(test_table_identifier, table_schema_nested)
+    with pytest.raises(TableAlreadyExistsError):
+        catalog.create_table(test_table_identifier, table_schema_nested)
+
+
+def test_table_exists(catalog: Catalog, test_table_identifier: Identifier, 
table_schema_nested: Schema) -> None:
+    namespace = Catalog.namespace_from(test_table_identifier)
+    catalog.create_namespace(namespace)
+    catalog.create_table(test_table_identifier, table_schema_nested, 
properties={"format-version": "2"})
+    assert catalog.table_exists(test_table_identifier)
+
+
+def test_table_exists_on_table_not_found(catalog: Catalog, 
test_table_identifier: Identifier) -> None:
+    assert not catalog.table_exists(test_table_identifier)
+
+
+def test_create_table_raises_error_when_namespace_does_not_exist(catalog: 
Catalog, table_schema_simple: Schema) -> None:
+    with pytest.raises(NoSuchNamespaceError):
+        catalog.create_table(("non_existent_ns", "table"), table_schema_simple)
+
+
+def test_table_raises_error_on_table_not_found(catalog: Catalog, 
test_table_identifier: Identifier) -> None:
+    identifier_str = ".".join(test_table_identifier)
+    with pytest.raises(NoSuchTableError, match=f"Table does not exist: 
{identifier_str}"):
+        catalog.load_table(test_table_identifier)
+
+
+def test_create_table_without_namespace(catalog: Catalog, table_schema_nested: 
Schema, table_name: str) -> None:
+    with pytest.raises(NoSuchNamespaceError):
+        catalog.create_table(table_name, table_schema_nested)
+
+
[email protected]("format_version", [1, 2])
+def test_create_table_transaction(catalog: Catalog, format_version: int) -> 
None:
+    identifier = 
f"default.arrow_create_table_transaction_{catalog.name}_{format_version}"
+    try:
+        catalog.create_namespace("default")
+    except NamespaceAlreadyExistsError:
+        pass
+
+    try:
+        catalog.drop_table(identifier=identifier)
+    except NoSuchTableError:
+        pass
+
+    pa_table = pa.Table.from_pydict(
+        {
+            "foo": ["a", None, "z"],
+        },
+        schema=pa.schema([pa.field("foo", pa.large_string(), nullable=True)]),
+    )
+
+    pa_table_with_column = pa.Table.from_pydict(
+        {
+            "foo": ["a", None, "z"],
+            "bar": [19, None, 25],
+        },
+        schema=pa.schema(
+            [
+                pa.field("foo", pa.large_string(), nullable=True),
+                pa.field("bar", pa.int32(), nullable=True),
+            ]
+        ),
+    )
+
+    with catalog.create_table_transaction(
+        identifier=identifier, schema=pa_table.schema, 
properties={"format-version": str(format_version)}
+    ) as txn:
+        with txn.update_snapshot().fast_append() as snapshot_update:
+            for data_file in 
_dataframe_to_data_files(table_metadata=txn.table_metadata, df=pa_table, 
io=txn._table.io):
+                snapshot_update.append_data_file(data_file)
+
+        with txn.update_schema() as schema_txn:
+            schema_txn.union_by_name(pa_table_with_column.schema)
+
+        with txn.update_snapshot().fast_append() as snapshot_update:
+            for data_file in _dataframe_to_data_files(
+                table_metadata=txn.table_metadata, df=pa_table_with_column, 
io=txn._table.io
+            ):
+                snapshot_update.append_data_file(data_file)
+
+    tbl = catalog.load_table(identifier=identifier)
+    assert tbl.format_version == format_version
+    assert len(tbl.scan().to_arrow()) == 6
+
+
+def test_create_table_default_sort_order(
+    catalog: Catalog, table_schema_nested: Schema, test_table_identifier: 
Identifier
+) -> None:
+    namespace = Catalog.namespace_from(test_table_identifier)
+    catalog.create_namespace(namespace)
+    table = catalog.create_table(test_table_identifier, table_schema_nested)
+    assert table.sort_order().order_id == 0, "Order ID must match"
+    assert table.sort_order().is_unsorted is True, "Order must be unsorted"
+    catalog.drop_table(test_table_identifier)
+
+
+def test_create_v1_table(catalog: Catalog, table_schema_nested: Schema, 
test_table_identifier: Identifier) -> None:
+    namespace = Catalog.namespace_from(test_table_identifier)
+    catalog.create_namespace(namespace)
+    table = catalog.create_table(test_table_identifier, table_schema_nested, 
properties={"format-version": "1"})
+    assert table.sort_order().order_id == 0, "Order ID must match"
+    assert table.sort_order().is_unsorted is True, "Order must be unsorted"
+    assert table.format_version == 1
+    assert table.spec() == UNPARTITIONED_PARTITION_SPEC
+    catalog.drop_table(test_table_identifier)
+
+
+def test_create_table_custom_sort_order(catalog: Catalog, table_schema_nested: 
Schema, test_table_identifier: Identifier) -> None:
+    namespace = Catalog.namespace_from(test_table_identifier)
+    catalog.create_namespace(namespace)
+    order = SortOrder(SortField(source_id=2, transform=IdentityTransform(), 
null_order=NullOrder.NULLS_FIRST))
+    table = catalog.create_table(test_table_identifier, table_schema_nested, 
sort_order=order)
+    given_sort_order = table.sort_order()
+    assert given_sort_order.order_id == 1, "Order ID must match"
+    assert len(given_sort_order.fields) == 1, "Order must have 1 field"
+    assert given_sort_order.fields[0].direction == SortDirection.ASC, 
"Direction must match"
+    assert given_sort_order.fields[0].null_order == NullOrder.NULLS_FIRST, 
"Null order must match"
+    assert isinstance(given_sort_order.fields[0].transform, 
IdentityTransform), "Transform must match"
+    catalog.drop_table(test_table_identifier)
+
+
+def test_create_table_with_default_warehouse_location(
+    warehouse: Path, catalog_with_warehouse: Catalog, table_schema_nested: 
Schema, test_table_identifier: Identifier
+) -> None:
+    identifier_tuple = Catalog.identifier_to_tuple(test_table_identifier)
+    namespace = Catalog.namespace_from(test_table_identifier)
+    catalog_with_warehouse.create_namespace(namespace)
+    catalog_with_warehouse.create_table(test_table_identifier, 
table_schema_nested)
+    table = catalog_with_warehouse.load_table(test_table_identifier)
+    assert table.name() == identifier_tuple
+    assert table.metadata_location.startswith(f"file://{warehouse}")
+    assert os.path.exists(table.metadata_location[len("file://") :])
+    catalog_with_warehouse.drop_table(test_table_identifier)
+
+
+def test_create_table_location_override(
+    catalog: Catalog,
+    tmp_path: Path,
+    table_schema_nested: Schema,
+    test_table_identifier: Identifier,
+    test_table_properties: dict[str, str],
+) -> None:
+    test_partition_spec = PartitionSpec(PartitionField(name="x", 
transform=IdentityTransform(), source_id=1, field_id=1000))
+    new_location = f"file://{tmp_path}/new_location"
+    namespace = Catalog.namespace_from(test_table_identifier)
+    catalog.create_namespace(namespace)
+    table = catalog.create_table(
+        identifier=test_table_identifier,
+        schema=table_schema_nested,
+        location=new_location,
+        partition_spec=test_partition_spec,
+        properties=test_table_properties,
+    )
+    assert catalog.load_table(test_table_identifier) == table
+    assert table.location() == new_location
+
+
+def test_create_table_removes_trailing_slash_from_location(
+    warehouse: Path, catalog: Catalog, table_schema_nested: Schema, 
test_table_identifier: Identifier
+) -> None:
+    identifier_tuple = Catalog.identifier_to_tuple(test_table_identifier)
+    namespace = Catalog.namespace_from(test_table_identifier)
+    table_name = Catalog.table_name_from(identifier_tuple)
+    location = f"file://{warehouse}/{catalog.name}/{table_name}-given"
+    catalog.create_namespace(namespace)
+    catalog.create_table(test_table_identifier, table_schema_nested, 
location=f"{location}/")
+    table = catalog.load_table(test_table_identifier)
+    assert table.name() == identifier_tuple
+    assert table.metadata_location.startswith(f"file://{warehouse}")
+    assert os.path.exists(table.metadata_location[len("file://") :])
+    assert table.location() == location
+    catalog.drop_table(test_table_identifier)
+
+
+def test_create_tables_idempotency(catalog: Catalog) -> None:
+    # Second initialization should not fail even if tables are already created
+    catalog.create_tables()  # type: ignore[attr-defined]
+    catalog.create_tables()  # type: ignore[attr-defined]
+
+
+def test_create_table_pyarrow_schema(
+    catalog: Catalog,
+    pyarrow_schema_simple_without_ids: pa.Schema,
+    test_table_identifier: Identifier,
+    test_table_properties: dict[str, str],
+) -> None:
+    namespace = Catalog.namespace_from(test_table_identifier)
+    catalog.create_namespace(namespace)
+    table = catalog.create_table(
+        identifier=test_table_identifier,
+        schema=pyarrow_schema_simple_without_ids,
+        properties=test_table_properties,
+    )
+    assert catalog.load_table(test_table_identifier) == table
+
+
+def test_write_pyarrow_schema(catalog: Catalog, test_table_identifier: 
Identifier) -> None:
+    pyarrow_table = pa.Table.from_arrays(
+        [
+            pa.array([None, "A", "B", "C"]),  # 'foo' column
+            pa.array([1, 2, 3, 4]),  # 'bar' column
+            pa.array([True, None, False, True]),  # 'baz' column
+            pa.array([None, "A", "B", "C"]),  # 'large' column
+        ],
+        schema=pa.schema(
+            [
+                pa.field("foo", pa.large_string(), nullable=True),
+                pa.field("bar", pa.int32(), nullable=False),
+                pa.field("baz", pa.bool_(), nullable=True),
+                pa.field("large", pa.large_string(), nullable=True),
+            ]
+        ),
+    )
+    namespace = Catalog.namespace_from(test_table_identifier)
+    catalog.create_namespace(namespace)
+    table = catalog.create_table(test_table_identifier, pyarrow_table.schema)
+    table.append(pyarrow_table)
+
+
[email protected](
+    "schema,expected",
+    [
+        (lazy_fixture("pyarrow_schema_simple_without_ids"), 
lazy_fixture("iceberg_schema_simple_no_ids")),
+        (lazy_fixture("table_schema_simple"), 
lazy_fixture("table_schema_simple")),
+        (lazy_fixture("table_schema_nested"), 
lazy_fixture("table_schema_nested")),
+        (lazy_fixture("pyarrow_schema_nested_without_ids"), 
lazy_fixture("iceberg_schema_nested_no_ids")),
+    ],
+)
+def test_convert_schema_if_needed(
+    schema: Schema | pa.Schema,
+    expected: Schema,
+    catalog: Catalog,
+) -> None:
+    assert expected == catalog._convert_schema_if_needed(schema)
+
+
+# Register table tests
+
+
+def test_register_table(catalog: Catalog, test_table_identifier: Identifier, 
metadata_location: str) -> None:
+    identifier_tuple = Catalog.identifier_to_tuple(test_table_identifier)
+    namespace = Catalog.namespace_from(test_table_identifier)
+    catalog.create_namespace(namespace)
+    table = catalog.register_table(test_table_identifier, metadata_location)
+    assert table.name() == identifier_tuple
+    assert table.metadata_location == metadata_location
+    assert os.path.exists(metadata_location)
+    catalog.drop_table(test_table_identifier)
+
+
+def test_register_existing_table(catalog: Catalog, test_table_identifier: 
Identifier, metadata_location: str) -> None:
+    namespace = Catalog.namespace_from(test_table_identifier)
+    catalog.create_namespace(namespace)
+    catalog.register_table(test_table_identifier, metadata_location)
+    with pytest.raises(TableAlreadyExistsError):
+        catalog.register_table(test_table_identifier, metadata_location)
+
+
+# Load table tests
+
+
+def test_load_table(catalog: Catalog, table_schema_nested: Schema, 
test_table_identifier: Identifier) -> None:
+    namespace = Catalog.namespace_from(test_table_identifier)
+    catalog.create_namespace(namespace)
+    table = catalog.create_table(test_table_identifier, table_schema_nested)
+    loaded_table = catalog.load_table(test_table_identifier)
+    assert table.name() == loaded_table.name()
+    assert table.metadata_location == loaded_table.metadata_location
+    assert table.metadata == loaded_table.metadata
+
+
+def test_load_table_from_self_identifier(
+    catalog: Catalog, table_schema_nested: Schema, test_table_identifier: 
Identifier
+) -> None:
+    identifier_tuple = Catalog.identifier_to_tuple(test_table_identifier)
+    namespace = Catalog.namespace_from(test_table_identifier)
+    catalog.create_namespace(namespace)
+    table = catalog.create_table(test_table_identifier, table_schema_nested)
+    intermediate = catalog.load_table(test_table_identifier)
+    assert intermediate.name() == identifier_tuple
+    loaded_table = catalog.load_table(intermediate.name())
+    assert table.name() == loaded_table.name()
+    assert table.metadata_location == loaded_table.metadata_location
+    assert table.metadata == loaded_table.metadata
+
+
+# Rename table tests
+
+
+def test_rename_table(
+    catalog: Catalog, table_schema_nested: Schema, test_table_identifier: 
Identifier, another_table_identifier: Identifier
+) -> None:
+    from_namespace = Catalog.namespace_from(test_table_identifier)
+    to_namespace = Catalog.namespace_from(another_table_identifier)
+    catalog.create_namespace(from_namespace)
+    catalog.create_namespace(to_namespace)
+    table = catalog.create_table(test_table_identifier, table_schema_nested)
+    assert table.name() == test_table_identifier
+    catalog.rename_table(test_table_identifier, another_table_identifier)
+    new_table = catalog.load_table(another_table_identifier)
+    assert new_table.name() == another_table_identifier
+    assert new_table.metadata_location == table.metadata_location
+    with pytest.raises(NoSuchTableError):
+        catalog.load_table(test_table_identifier)
+
+
+def test_rename_table_from_self_identifier(
+    catalog: Catalog, table_schema_nested: Schema, test_table_identifier: 
Identifier, another_table_identifier: Identifier
+) -> None:
+    from_namespace = Catalog.namespace_from(test_table_identifier)
+    to_namespace = Catalog.namespace_from(another_table_identifier)
+    catalog.create_namespace(from_namespace)
+    catalog.create_namespace(to_namespace)
+    table = catalog.create_table(test_table_identifier, table_schema_nested)
+    assert table.name() == test_table_identifier
+    catalog.rename_table(table.name(), another_table_identifier)
+    new_table = catalog.load_table(another_table_identifier)
+    assert new_table.name() == another_table_identifier
+    assert new_table.metadata_location == table.metadata_location
+    with pytest.raises(NoSuchTableError):
+        catalog.load_table(table.name())
+    with pytest.raises(NoSuchTableError):
+        catalog.load_table(test_table_identifier)
+
+
+def test_rename_table_to_existing_one(
+    catalog: Catalog, table_schema_nested: Schema, test_table_identifier: 
Identifier, another_table_identifier: Identifier
+) -> None:
+    from_namespace = Catalog.namespace_from(test_table_identifier)
+    to_namespace = Catalog.namespace_from(another_table_identifier)
+    catalog.create_namespace(from_namespace)
+    catalog.create_namespace(to_namespace)
+    table = catalog.create_table(test_table_identifier, table_schema_nested)
+    assert table.name() == test_table_identifier
+    new_table = catalog.create_table(another_table_identifier, 
table_schema_nested)
+    assert new_table.name() == another_table_identifier
+    with pytest.raises(TableAlreadyExistsError):
+        catalog.rename_table(test_table_identifier, another_table_identifier)
+
+
+def test_rename_missing_table(catalog: Catalog, test_table_identifier: 
Identifier, another_table_identifier: Identifier) -> None:
+    from_namespace = Catalog.namespace_from(test_table_identifier)
+    to_namespace = Catalog.namespace_from(another_table_identifier)
+    catalog.create_namespace(from_namespace)
+    catalog.create_namespace(to_namespace)
+    with pytest.raises(NoSuchTableError):
+        catalog.rename_table(test_table_identifier, another_table_identifier)
+
+
+def test_rename_table_to_missing_namespace(
+    catalog: Catalog, table_schema_nested: Schema, test_table_identifier: 
Identifier, another_table_identifier: Identifier
+) -> None:
+    from_namespace = Catalog.namespace_from(test_table_identifier)
+    catalog.create_namespace(from_namespace)
+    table = catalog.create_table(test_table_identifier, table_schema_nested)
+    assert table.name() == test_table_identifier
+    with pytest.raises(NoSuchNamespaceError):
+        catalog.rename_table(test_table_identifier, another_table_identifier)
+
+
+# Drop table tests
+
+
+def test_drop_table(catalog: Catalog, table_schema_nested: Schema, 
test_table_identifier: Identifier) -> None:
+    identifier_tuple = Catalog.identifier_to_tuple(test_table_identifier)
+    namespace = Catalog.namespace_from(test_table_identifier)
+    catalog.create_namespace(namespace)
+    table = catalog.create_table(test_table_identifier, table_schema_nested)
+    assert table.name() == identifier_tuple
+    catalog.drop_table(test_table_identifier)
+    with pytest.raises(NoSuchTableError):
+        catalog.load_table(test_table_identifier)
+
+
+def test_drop_table_from_self_identifier(
+    catalog: Catalog, table_schema_nested: Schema, test_table_identifier: 
Identifier
+) -> None:
+    identifier_tuple = Catalog.identifier_to_tuple(test_table_identifier)
+    namespace = Catalog.namespace_from(test_table_identifier)
+    catalog.create_namespace(namespace)
+    table = catalog.create_table(test_table_identifier, table_schema_nested)
+    assert table.name() == identifier_tuple
+    catalog.drop_table(table.name())
+    with pytest.raises(NoSuchTableError):
+        catalog.load_table(table.name())
+    with pytest.raises(NoSuchTableError):
+        catalog.load_table(test_table_identifier)
+
+
+def test_drop_table_that_does_not_exist_raise_error(catalog: Catalog, 
test_table_identifier: Identifier) -> None:
+    with pytest.raises(NoSuchTableError):
+        catalog.drop_table(test_table_identifier)
+
+
+def test_purge_table(catalog: Catalog, table_schema_simple: Schema, 
test_table_identifier: Identifier) -> None:
+    namespace = Catalog.namespace_from(test_table_identifier)
+    catalog.create_namespace(namespace)
+    catalog.create_table(test_table_identifier, table_schema_simple)
+    catalog.purge_table(test_table_identifier)
+    with pytest.raises(NoSuchTableError, match=f"Table does not exist: 
{'.'.join(test_table_identifier)}"):
+        catalog.load_table(test_table_identifier)
+
+
+# List tables tests
+
+
+def test_list_tables(
+    catalog: Catalog, table_schema_nested: Schema, test_table_identifier: 
Identifier, another_table_identifier: Identifier
+) -> None:
+    namespace_1 = Catalog.namespace_from(test_table_identifier)
+    namespace_2 = Catalog.namespace_from(another_table_identifier)
+    catalog.create_namespace(namespace_1)
+    catalog.create_namespace(namespace_2)
+    catalog.create_table(test_table_identifier, table_schema_nested)
+    catalog.create_table(another_table_identifier, table_schema_nested)
+    identifier_list = catalog.list_tables(namespace_1)
+    assert len(identifier_list) == 1
+    assert test_table_identifier in identifier_list
+
+    identifier_list = catalog.list_tables(namespace_2)
+    assert len(identifier_list) == 1
+    assert another_table_identifier in identifier_list
+
+
+def test_list_tables_under_a_namespace(catalog: Catalog, table_schema_nested: 
Schema, test_table_identifier: Identifier) -> None:
+    namespace = Catalog.namespace_from(test_table_identifier)
+    catalog.create_namespace(namespace)
+    catalog.create_table(test_table_identifier, table_schema_nested)
+    new_namespace = ("new", "namespace")
+    catalog.create_namespace(new_namespace)
+    all_tables = catalog.list_tables(namespace=namespace)
+    new_namespace_tables = catalog.list_tables(new_namespace)
+    assert all_tables
+    assert test_table_identifier in all_tables
+    assert new_namespace_tables == []
+
+
+def test_list_tables_when_missing_namespace(catalog: Catalog, test_namespace: 
Identifier) -> None:
+    with pytest.raises(NoSuchNamespaceError):
+        catalog.list_tables(test_namespace)
+
+
+# Commit table tests
+def test_commit_table(catalog: Catalog, table_schema_nested: Schema, 
test_table_identifier: Identifier) -> None:
+    namespace = Catalog.namespace_from(test_table_identifier)
+    catalog.create_namespace(namespace)
+    table = catalog.create_table(test_table_identifier, table_schema_nested)
+    last_updated_ms = table.metadata.last_updated_ms
+    original_table_metadata_location = table.metadata_location
+    original_table_last_updated_ms = table.metadata.last_updated_ms
+
+    assert catalog._parse_metadata_version(table.metadata_location) == 0  # 
type: ignore[attr-defined]
+    assert table.metadata.current_schema_id == 0
+
+    transaction = table.transaction()
+    update = transaction.update_schema()
+    update.add_column(path="b", field_type=IntegerType())
+    update.commit()
+    transaction.commit_transaction()
+
+    updated_table_metadata = table.metadata
+
+    assert catalog._parse_metadata_version(table.metadata_location) == 1  # 
type: ignore[attr-defined]
+    assert updated_table_metadata.current_schema_id == 1
+    assert len(updated_table_metadata.schemas) == 2
+    new_schema = next(schema for schema in updated_table_metadata.schemas if 
schema.schema_id == 1)
+    assert new_schema
+    assert new_schema == update._apply()
+    assert new_schema.find_field("b").field_type == IntegerType()
+    assert updated_table_metadata.last_updated_ms > last_updated_ms
+    assert len(updated_table_metadata.metadata_log) == 1
+    assert updated_table_metadata.metadata_log[0].metadata_file == 
original_table_metadata_location
+    assert updated_table_metadata.metadata_log[0].timestamp_ms == 
original_table_last_updated_ms
+
+
+def test_catalog_commit_table_applies_schema_updates(
+    catalog: Catalog,
+    table_schema_nested: Schema,
+    test_table_identifier: Identifier,
+) -> None:
+    namespace = Catalog.namespace_from(test_table_identifier)
+    catalog.create_namespace(namespace)
+    table = catalog.create_table(test_table_identifier, table_schema_nested)
+
+    new_schema = Schema(
+        NestedField(1, "x", LongType()),
+        NestedField(2, "y", LongType(), doc="comment"),
+        NestedField(3, "z", LongType()),
+        NestedField(4, "add", LongType()),
+    )
+
+    response = table.catalog.commit_table(
+        table,
+        updates=(
+            AddSchemaUpdate(schema=new_schema),
+            SetCurrentSchemaUpdate(),
+        ),
+        requirements=(),
+    )
+    assert response.metadata.table_uuid == table.metadata.table_uuid
+    assert len(response.metadata.schemas) == 2
+    assert response.metadata.schemas[1] == new_schema
+    assert response.metadata.current_schema_id == new_schema.schema_id
+
+
+def test_concurrent_commit_table(catalog: Catalog, table_schema_nested: 
Schema, test_table_identifier: Identifier) -> None:
+    namespace = Catalog.namespace_from(test_table_identifier)
+    catalog.create_namespace(namespace)
+    table_a = catalog.create_table(test_table_identifier, table_schema_nested)
+    table_b = catalog.load_table(test_table_identifier)
+
+    with table_a.update_schema() as update:
+        update.add_column(path="b", field_type=IntegerType())
+
+    with pytest.raises(CommitFailedException, match="Requirement failed: 
current schema id has changed: expected 0, found 1"):
+        # This one should fail since it already has been updated
+        with table_b.update_schema() as update:
+            update.add_column(path="c", field_type=IntegerType())
+
+
+def test_delete_metadata_multiple(catalog: Catalog, table_schema_nested: 
Schema, random_table_identifier: Identifier) -> None:
+    namespace = Catalog.namespace_from(random_table_identifier)
+    catalog.create_namespace(namespace)
+    table = catalog.create_table(random_table_identifier, table_schema_nested)
+
+    original_metadata_location = table.metadata_location
+
+    for i in range(5):
+        with table.transaction() as transaction:
+            with transaction.update_schema() as update:
+                update.add_column(path=f"new_column_{i}", 
field_type=IntegerType())
+
+    assert len(table.metadata.metadata_log) == 5
+    assert os.path.exists(original_metadata_location[len("file://") :])
+
+    # Set the max versions property to 2, and delete after commit
+    new_property = {
+        TableProperties.METADATA_PREVIOUS_VERSIONS_MAX: "2",
+        TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED: "true",
+    }
+
+    with table.transaction() as transaction:
+        transaction.set_properties(properties=new_property)
+
+    # Verify that only the most recent metadata files are kept
+    assert len(table.metadata.metadata_log) == 2
+    updated_metadata_1, updated_metadata_2 = table.metadata.metadata_log
+
+    # new metadata log was added, so earlier metadata logs are removed.
+    with table.transaction() as transaction:
+        with transaction.update_schema() as update:
+            update.add_column(path="new_column_x", field_type=IntegerType())
+
+    assert len(table.metadata.metadata_log) == 2
+    assert not os.path.exists(original_metadata_location[len("file://") :])
+    assert not os.path.exists(updated_metadata_1.metadata_file[len("file://") 
:])
+    assert os.path.exists(updated_metadata_2.metadata_file[len("file://") :])
+
+
+# Table properties tests
+
+
+def test_table_properties_int_value(catalog: Catalog, table_schema_simple: 
Schema, test_table_identifier: Identifier) -> None:
+    # table properties can be set to int, but still serialized to string
+    namespace = Catalog.namespace_from(test_table_identifier)
+    catalog.create_namespace(namespace)
+    property_with_int = {"property_name": 42}
+    table = catalog.create_table(test_table_identifier, table_schema_simple, 
properties=property_with_int)
+    assert isinstance(table.properties["property_name"], str)
+
+
+def test_table_properties_raise_for_none_value(
+    catalog: Catalog, table_schema_simple: Schema, test_table_identifier: 
Identifier
+) -> None:
+    namespace = Catalog.namespace_from(test_table_identifier)
+    catalog.create_namespace(namespace)
+    property_with_none = {"property_name": None}
+    with pytest.raises(ValidationError) as exc_info:
+        _ = catalog.create_table(test_table_identifier, table_schema_simple, 
properties=property_with_none)
+    assert "None type is not a supported value in properties: property_name" 
in str(exc_info.value)
+
+
+# Append table
+
+
+def test_append_table(catalog: Catalog, table_schema_simple: Schema, 
test_table_identifier: Identifier) -> None:
+    namespace = Catalog.namespace_from(test_table_identifier)
+    catalog.create_namespace(namespace)
+    table = catalog.create_table(test_table_identifier, table_schema_simple)
+
+    df = pa.Table.from_pydict(
+        {
+            "foo": ["a"],
+            "bar": [1],
+            "baz": [True],
+        },
+        schema=schema_to_pyarrow(table_schema_simple),
+    )
+
+    table.append(df)
+
+    # new snapshot is written in APPEND mode
+    assert len(table.metadata.snapshots) == 1
+    assert table.metadata.snapshots[0].snapshot_id == 
table.metadata.current_snapshot_id
+    assert table.metadata.snapshots[0].parent_snapshot_id is None
+    assert table.metadata.snapshots[0].sequence_number == 1
+    assert table.metadata.snapshots[0].summary is not None
+    assert table.metadata.snapshots[0].summary.operation == Operation.APPEND
+    assert table.metadata.snapshots[0].summary["added-data-files"] == "1"
+    assert table.metadata.snapshots[0].summary["added-records"] == "1"
+    assert table.metadata.snapshots[0].summary["total-data-files"] == "1"
+    assert table.metadata.snapshots[0].summary["total-records"] == "1"
+    assert len(table.metadata.metadata_log) == 1
+
+    # read back the data
+    assert df == table.scan().to_arrow()
+
+
+# Test writes
+def test_table_writes_metadata_to_custom_location(
+    catalog: Catalog,
+    test_table_identifier: Identifier,
+    table_schema_simple: Schema,
+    warehouse: Path,
+) -> None:
+    namespace = Catalog.namespace_from(test_table_identifier)
+    metadata_path = f"file://{warehouse}/custom/path"
+    catalog.create_namespace(namespace)
+    table = catalog.create_table(
+        identifier=test_table_identifier,
+        schema=table_schema_simple,
+        properties={TableProperties.WRITE_METADATA_PATH: metadata_path},
+    )
+    df = pa.Table.from_pydict(
+        {"foo": ["a"], "bar": [1], "baz": [True]},
+        schema=schema_to_pyarrow(table_schema_simple),
+    )
+    table.append(df)
+    snapshot = table.current_snapshot()
+    assert snapshot is not None
+    manifests = snapshot.manifests(table.io)
+    location_provider = table.location_provider()
+
+    assert 
location_provider.new_metadata_location("").startswith(metadata_path)
+    assert manifests[0].manifest_path.startswith(metadata_path)
+    assert table.location() != metadata_path
+    assert table.metadata_location.startswith(metadata_path)
+
+
+def test_table_writes_metadata_to_default_path(
+    catalog: Catalog,
+    test_table_identifier: Identifier,
+    table_schema_simple: Schema,
+    test_table_properties: dict[str, str],
+) -> None:
+    namespace = Catalog.namespace_from(test_table_identifier)
+    catalog.create_namespace(namespace)
+    table = catalog.create_table(
+        identifier=test_table_identifier,
+        schema=table_schema_simple,
+        properties=test_table_properties,
+    )
+    metadata_path = f"{table.location()}/metadata"
+    df = pa.Table.from_pydict(
+        {"foo": ["a"], "bar": [1], "baz": [True]},
+        schema=schema_to_pyarrow(table_schema_simple),
+    )
+    table.append(df)
+    snapshot = table.current_snapshot()
+    assert snapshot is not None
+    manifests = snapshot.manifests(table.io)
+    location_provider = table.location_provider()
+
+    assert 
location_provider.new_metadata_location("").startswith(metadata_path)
+    assert manifests[0].manifest_path.startswith(metadata_path)
+    assert table.metadata_location.startswith(metadata_path)
+
+
+def test_table_metadata_writes_reflect_latest_path(
+    catalog: Catalog,
+    test_table_identifier: Identifier,
+    table_schema_simple: Schema,
+    warehouse: Path,
+) -> None:
+    namespace = Catalog.namespace_from(test_table_identifier)
+    catalog.create_namespace(namespace)
+    table = catalog.create_table(
+        identifier=test_table_identifier,
+        schema=table_schema_simple,
+    )
+
+    initial_metadata_path = f"{table.location()}/metadata"
+    assert table.location_provider().new_metadata_location("metadata.json") == 
f"{initial_metadata_path}/metadata.json"
+
+    # update table with new path for metadata
+    new_metadata_path = f"file://{warehouse}/custom/path"
+    table.transaction().set_properties({TableProperties.WRITE_METADATA_PATH: 
new_metadata_path}).commit_transaction()
+
+    assert table.location_provider().new_metadata_location("metadata.json") == 
f"{new_metadata_path}/metadata.json"
+
+
[email protected]("format_version", [1, 2])
+def test_write_and_evolve(catalog: Catalog, format_version: int) -> None:
+    identifier = 
f"default.arrow_write_data_and_evolve_schema_v{format_version}"
+
+    try:
+        catalog.create_namespace("default")
+    except NamespaceAlreadyExistsError:
+        pass
+
+    try:
+        catalog.drop_table(identifier=identifier)
+    except NoSuchTableError:
+        pass
+
+    pa_table = pa.Table.from_pydict(
+        {
+            "foo": ["a", None, "z"],
+        },
+        schema=pa.schema([pa.field("foo", pa.large_string(), nullable=True)]),
+    )
+
+    tbl = catalog.create_table(identifier=identifier, schema=pa_table.schema, 
properties={"format-version": str(format_version)})
+
+    pa_table_with_column = pa.Table.from_pydict(
+        {
+            "foo": ["a", None, "z"],
+            "bar": [19, None, 25],
+        },
+        schema=pa.schema(
+            [
+                pa.field("foo", pa.large_string(), nullable=True),
+                pa.field("bar", pa.int32(), nullable=True),
+            ]
+        ),
+    )
+
+    with tbl.transaction() as txn:
+        with txn.update_schema() as schema_txn:
+            schema_txn.union_by_name(pa_table_with_column.schema)
+
+        txn.append(pa_table_with_column)
+        txn.overwrite(pa_table_with_column)
+        txn.delete("foo = 'a'")
+
+
+# Merge manifests
[email protected]("format_version", [1, 2])
+def test_merge_manifests_local_file_system(catalog: Catalog, 
arrow_table_with_null: pa.Table, format_version: int) -> None:
+    # To catch manifest file name collision bug during merge:
+    # https://github.com/apache/iceberg-python/pull/363#discussion_r1660691918
+    catalog.create_namespace_if_not_exists("default")
+    try:
+        catalog.drop_table("default.test_merge_manifest")
+    except NoSuchTableError:
+        pass
+    tbl = catalog.create_table(
+        "default.test_merge_manifest",
+        arrow_table_with_null.schema,
+        properties={
+            "commit.manifest-merge.enabled": "true",
+            "commit.manifest.min-count-to-merge": "2",
+            "format-version": format_version,
+        },
+    )
+
+    for _ in range(5):
+        tbl.append(arrow_table_with_null)
+
+    assert len(tbl.scan().to_arrow()) == 5 * len(arrow_table_with_null)
+    current_snapshot = tbl.current_snapshot()
+    assert current_snapshot
+    manifests = current_snapshot.manifests(tbl.io)
+    assert len(manifests) == 1
+
+
+# Add column to table
+
+
+def test_add_column(catalog: Catalog, table_schema_simple: Schema, 
random_table_identifier: Identifier) -> None:
+    namespace = Catalog.namespace_from(random_table_identifier)
+    catalog.create_namespace(namespace)
+    table = catalog.create_table(random_table_identifier, table_schema_simple)
+    table.update_schema().add_column(path="new_column1", 
field_type=IntegerType()).commit()
+    assert table.schema() == Schema(
+        NestedField(field_id=1, name="foo", field_type=StringType(), 
required=False),
+        NestedField(field_id=2, name="bar", field_type=IntegerType(), 
required=True),
+        NestedField(field_id=3, name="baz", field_type=BooleanType(), 
required=False),
+        NestedField(field_id=4, name="new_column1", field_type=IntegerType(), 
required=False),
+        schema_id=1,
+        identifier_field_ids=[2],
+    )
+    assert table.schema().schema_id == 1
+
+    transaction = table.transaction()
+    transaction.update_schema().add_column(path="new_column2", 
field_type=IntegerType(), doc="doc").commit()
+    transaction.commit_transaction()
+
+    assert table.schema() == Schema(
+        NestedField(field_id=1, name="foo", field_type=StringType(), 
required=False),
+        NestedField(field_id=2, name="bar", field_type=IntegerType(), 
required=True),
+        NestedField(field_id=3, name="baz", field_type=BooleanType(), 
required=False),
+        NestedField(field_id=4, name="new_column1", field_type=IntegerType(), 
required=False),
+        NestedField(field_id=5, name="new_column2", field_type=IntegerType(), 
required=False, doc="doc"),
+        identifier_field_ids=[2],
+    )
+    assert table.schema().schema_id == 2
+
+
+def test_add_column_with_statement(catalog: Catalog, table_schema_simple: 
Schema, random_table_identifier: Identifier) -> None:
+    namespace = Catalog.namespace_from(random_table_identifier)
+    catalog.create_namespace(namespace)
+    table = catalog.create_table(random_table_identifier, table_schema_simple)
+
+    with table.update_schema() as tx:
+        tx.add_column(path="new_column1", field_type=IntegerType())
+
+    assert table.schema() == Schema(
+        NestedField(field_id=1, name="foo", field_type=StringType(), 
required=False),
+        NestedField(field_id=2, name="bar", field_type=IntegerType(), 
required=True),
+        NestedField(field_id=3, name="baz", field_type=BooleanType(), 
required=False),
+        NestedField(field_id=4, name="new_column1", field_type=IntegerType(), 
required=False),
+        identifier_field_ids=[2],
+    )
+    assert table.schema().schema_id == 1
+
+    with table.transaction() as tx:
+        tx.update_schema().add_column(path="new_column2", 
field_type=IntegerType(), doc="doc").commit()
+
+    assert table.schema() == Schema(
+        NestedField(field_id=1, name="foo", field_type=StringType(), 
required=False),
+        NestedField(field_id=2, name="bar", field_type=IntegerType(), 
required=True),
+        NestedField(field_id=3, name="baz", field_type=BooleanType(), 
required=False),
+        NestedField(field_id=4, name="new_column1", field_type=IntegerType(), 
required=False),
+        NestedField(field_id=5, name="new_column2", field_type=IntegerType(), 
required=False, doc="doc"),
+        identifier_field_ids=[2],
+    )
+    assert table.schema().schema_id == 2
+
+
+# Namespace tests
+
+
+def test_create_namespace(catalog: Catalog, test_namespace: Identifier, 
test_table_properties: dict[str, str]) -> None:
+    catalog.create_namespace(test_namespace, test_table_properties)
+    assert catalog._namespace_exists(test_namespace)  # type: 
ignore[attr-defined]
+    assert (Catalog.identifier_to_tuple(test_namespace)[:1]) in 
catalog.list_namespaces()
+    assert test_table_properties == 
catalog.load_namespace_properties(test_namespace)
+
+
+def test_create_namespace_raises_error_on_existing_namespace(
+    catalog: Catalog, test_namespace: Identifier, test_table_properties: 
dict[str, str]
+) -> None:
+    catalog.create_namespace(test_namespace, test_table_properties)
+    with pytest.raises(NamespaceAlreadyExistsError):
+        catalog.create_namespace(test_namespace, test_table_properties)
+
+
+def test_create_namespace_if_not_exists(catalog: Catalog, database_name: str) 
-> None:
+    catalog.create_namespace(database_name)
+    assert (database_name,) in catalog.list_namespaces()
+    catalog.create_namespace_if_not_exists(database_name)
+    assert (database_name,) in catalog.list_namespaces()
+
+
+def test_create_namespaces_sharing_same_prefix(catalog: Catalog, 
test_namespace: Identifier) -> None:
+    child_namespace = test_namespace + ("child",)
+    # Parent first
+    catalog.create_namespace(test_namespace)
+    # Then child
+    catalog.create_namespace(child_namespace)
+
+
+def test_create_namespace_with_comment_and_location(catalog: Catalog, 
test_namespace: Identifier) -> None:
+    test_location = "/test/location"
+    test_properties = {
+        "comment": "this is a test description",
+        "location": test_location,
+    }
+    catalog.create_namespace(namespace=test_namespace, 
properties=test_properties)
+    loaded_database_list = catalog.list_namespaces()
+    assert Catalog.identifier_to_tuple(test_namespace)[:1] in 
loaded_database_list
+    properties = catalog.load_namespace_properties(test_namespace)
+    assert properties["comment"] == "this is a test description"
+    assert properties["location"] == test_location
+
+
[email protected]("ignore")
+def test_create_namespace_with_null_properties(catalog: Catalog, 
test_namespace: Identifier) -> None:
+    with pytest.raises(IntegrityError):
+        catalog.create_namespace(namespace=test_namespace, properties={None: 
"value"})  # type: ignore
+
+    with pytest.raises(IntegrityError):
+        catalog.create_namespace(namespace=test_namespace, properties={"key": 
None})
+
+
[email protected]("empty_namespace", ["", (), (""), ("", ""), " ", (" 
")])
+def test_create_namespace_with_empty_identifier(catalog: Catalog, 
empty_namespace: Any) -> None:
+    with pytest.raises(NoSuchNamespaceError):
+        catalog.create_namespace(empty_namespace)
+
+
+# Get namespace tests
+
+
+def 
test_get_namespace_metadata_raises_error_when_namespace_does_not_exist(catalog: 
Catalog, test_namespace: Identifier) -> None:
+    namespace = ".".join(test_namespace)
+    with pytest.raises(NoSuchNamespaceError, match=f"Namespace {namespace} 
does not exists"):
+        catalog.load_namespace_properties(test_namespace)
+
+
+def test_namespace_exists(catalog: Catalog) -> None:
+    for ns in [("db1",), ("db1", "ns1"), ("db2", "ns1"), ("db3", "ns1", 
"ns2")]:
+        catalog.create_namespace(ns)
+        assert catalog._namespace_exists(ns)  # type: ignore[attr-defined]
+
+    # `db2` exists because `db2.ns1` exists
+    assert catalog._namespace_exists("db2")  # type: ignore[attr-defined]
+    # `db3.ns1` exists because `db3.ns1.ns2` exists
+    assert catalog._namespace_exists("db3.ns1")  # type: ignore[attr-defined]
+    # make sure '_' is escaped in the query
+    assert not catalog._namespace_exists("db_")  # type: ignore[attr-defined]
+    # make sure '%' is escaped in the query
+    assert not catalog._namespace_exists("db%")  # type: ignore[attr-defined]
+
+
+# Namespace properties
+
+
+def test_load_namespace_properties(catalog: Catalog, test_namespace: 
Identifier) -> None:
+    warehouse_location = "/test/location"
+    test_properties = {
+        "comment": "this is a test description",
+        "location": f"{warehouse_location}/{test_namespace}",
+        "test_property1": "1",
+        "test_property2": "2",
+        "test_property3": "3",
+    }
+
+    catalog.create_namespace(test_namespace, test_properties)
+    listed_properties = catalog.load_namespace_properties(test_namespace)
+    for k, v in listed_properties.items():
+        assert k in test_properties
+        assert v == test_properties[k]
+
+
+def test_load_namespace_properties_non_existing_namespace(catalog: Catalog) -> 
None:
+    with pytest.raises(NoSuchNamespaceError):
+        catalog.load_namespace_properties("does_not_exist")
+
+
+def test_load_empty_namespace_properties(catalog: Catalog, test_namespace: 
Identifier) -> None:
+    catalog.create_namespace(test_namespace)
+    listed_properties = catalog.load_namespace_properties(test_namespace)
+    assert listed_properties == {"exists": "true"}
+
+
+# List namespaces tests
+
+
+def test_list_namespaces(catalog: Catalog) -> None:
+    namespace_list = ["db", "db.ns1", "db.ns1.ns2", "db.ns2", "db2", 
"db2.ns1", "db%"]
+    for namespace in namespace_list:
+        if not catalog._namespace_exists(namespace):  # type: 
ignore[attr-defined]
+            catalog.create_namespace(namespace)
+
+    ns_list = catalog.list_namespaces()
+    for ns in [("db",), ("db%",), ("db2",)]:
+        assert ns in ns_list
+
+    ns_list = catalog.list_namespaces("db")
+    assert sorted(ns_list) == [("db", "ns1"), ("db", "ns2")]
+
+    ns_list = catalog.list_namespaces("db.ns1")
+    assert sorted(ns_list) == [("db", "ns1", "ns2")]
+
+    ns_list = catalog.list_namespaces("db.ns1.ns2")
+    assert len(ns_list) == 0
+
+
+def test_list_namespaces_fuzzy_match(catalog: Catalog) -> None:
+    namespace_list = ["db.ns1", "db.ns1.ns2", "db.ns2", "db.ns1X.ns3", 
"db_.ns1.ns2", "db2.ns1.ns2"]
+    for namespace in namespace_list:
+        if not catalog._namespace_exists(namespace):  # type: 
ignore[attr-defined]
+            catalog.create_namespace(namespace)
+
+    assert catalog.list_namespaces("db.ns1") == [("db", "ns1", "ns2")]
+
+    assert catalog.list_namespaces("db_.ns1") == [("db_", "ns1", "ns2")]
+
+
+def test_list_non_existing_namespaces(catalog: Catalog) -> None:
+    with pytest.raises(NoSuchNamespaceError):
+        catalog.list_namespaces("does_not_exist")
+
+
+# Update namespace properties tests
+
+
+def test_update_namespace_properties(catalog: Catalog, test_namespace: 
Identifier) -> None:
+    warehouse_location = "/test/location"
+    test_properties = {
+        "comment": "this is a test description",
+        "location": f"{warehouse_location}/{test_namespace}",
+        "test_property1": "1",
+        "test_property2": "2",
+        "test_property3": "3",
+    }
+    removals = {"test_property1", "test_property2", "test_property3", 
"should_not_removed"}
+    updates = {"test_property4": "4", "test_property5": "5", "comment": 
"updated test description"}
+    catalog.create_namespace(test_namespace, test_properties)
+    update_report = catalog.update_namespace_properties(test_namespace, 
removals, updates)
+    for k in updates.keys():
+        assert k in update_report.updated
+    for k in removals:
+        if k == "should_not_removed":
+            assert k in update_report.missing
+        else:
+            assert k in update_report.removed
+    assert catalog.load_namespace_properties(test_namespace) == {
+        "comment": "updated test description",
+        "test_property4": "4",
+        "test_property5": "5",
+        "location": f"{warehouse_location}/{test_namespace}",
+    }
+
+
+def test_update_namespace_metadata_raises_error_when_namespace_does_not_exist(
+    catalog: Catalog, test_namespace: Identifier, test_table_properties: 
dict[str, str]
+) -> None:
+    namespace = ".".join(test_namespace)
+    with pytest.raises(NoSuchNamespaceError, match=f"Namespace {namespace} 
does not exists"):
+        catalog.update_namespace_properties(test_namespace, 
updates=test_table_properties)
+
+
+def test_update_namespace_metadata(catalog: Catalog, test_namespace: 
Identifier, test_table_properties: dict[str, str]) -> None:
+    catalog.create_namespace(test_namespace, test_table_properties)
+    new_metadata = {"key3": "value3", "key4": "value4"}
+    summary = catalog.update_namespace_properties(test_namespace, 
updates=new_metadata)
+    assert catalog._namespace_exists(test_namespace)  # type: 
ignore[attr-defined]
+    assert new_metadata.items() <= 
catalog.load_namespace_properties(test_namespace).items()
+    assert summary.removed == []
+    assert sorted(summary.updated) == ["key3", "key4"]
+    assert summary.missing == []
+
+
+def test_update_namespace_metadata_removals(
+    catalog: Catalog, test_namespace: Identifier, test_table_properties: 
dict[str, str]
+) -> None:
+    catalog.create_namespace(test_namespace, test_table_properties)
+    new_metadata = {"key3": "value3", "key4": "value4"}
+    remove_metadata = {"key1"}
+    summary = catalog.update_namespace_properties(test_namespace, 
remove_metadata, new_metadata)
+    assert catalog._namespace_exists(test_namespace)  # type: 
ignore[attr-defined]
+    assert new_metadata.items() <= 
catalog.load_namespace_properties(test_namespace).items()
+    assert 
remove_metadata.isdisjoint(catalog.load_namespace_properties(test_namespace).keys())
+    assert summary.removed == ["key1"]
+    assert sorted(summary.updated) == ["key3", "key4"]
+    assert summary.missing == []
+
+
+# Drop namespace tests
+
+
+def test_drop_namespace(catalog: Catalog, table_schema_nested: Schema, 
test_table_identifier: Identifier) -> None:
+    namespace = Catalog.namespace_from(test_table_identifier)
+    catalog.create_namespace(namespace)
+    assert catalog._namespace_exists(namespace)  # type: ignore[attr-defined]
+    catalog.create_table(test_table_identifier, table_schema_nested)
+    with pytest.raises(NamespaceNotEmptyError):
+        catalog.drop_namespace(namespace)
+    catalog.drop_table(test_table_identifier)
+    catalog.drop_namespace(namespace)
+    assert not catalog._namespace_exists(namespace)  # type: 
ignore[attr-defined]
+
+
+def test_drop_namespace_raises_error_when_namespace_does_not_exist(catalog: 
Catalog) -> None:
+    with pytest.raises(NoSuchNamespaceError):
+        catalog.drop_namespace("does_not_exist")
+
+
+def test_drop_namespace_raises_error_when_namespace_not_empty(
+    catalog: Catalog, table_schema_nested: Schema, test_table_identifier: 
Identifier
+) -> None:
+    namespace = Catalog.namespace_from(test_table_identifier)
+    catalog.create_namespace(namespace)
+    catalog.create_table(test_table_identifier, table_schema_nested)
+    with pytest.raises(NamespaceNotEmptyError, match=f"Namespace 
{'.'.join(namespace)} is not empty"):
+        catalog.drop_namespace(namespace)
diff --git a/tests/catalog/test_sql.py b/tests/catalog/test_sql.py
index 22b9883c..f6846195 100644
--- a/tests/catalog/test_sql.py
+++ b/tests/catalog/test_sql.py
@@ -15,22 +15,15 @@
 # specific language governing permissions and limitations
 # under the License.
 
-import os
 from collections.abc import Generator
 from pathlib import Path
-from typing import Any, cast
+from typing import cast
 
-import pyarrow as pa
 import pytest
-from pydantic_core import ValidationError
-from pytest_lazyfixture import lazy_fixture
 from sqlalchemy import Engine, create_engine, inspect
-from sqlalchemy.exc import ArgumentError, IntegrityError
+from sqlalchemy.exc import ArgumentError
 
-from pyiceberg.catalog import (
-    Catalog,
-    load_catalog,
-)
+from pyiceberg.catalog import load_catalog
 from pyiceberg.catalog.sql import (
     DEFAULT_ECHO_VALUE,
     DEFAULT_POOL_PRE_PING_VALUE,
@@ -39,29 +32,11 @@ from pyiceberg.catalog.sql import (
     SqlCatalogBaseTable,
 )
 from pyiceberg.exceptions import (
-    CommitFailedException,
-    NamespaceAlreadyExistsError,
-    NamespaceNotEmptyError,
-    NoSuchNamespaceError,
     NoSuchPropertyException,
-    NoSuchTableError,
     TableAlreadyExistsError,
 )
-from pyiceberg.io import FSSPEC_FILE_IO, PY_IO_IMPL
-from pyiceberg.io.pyarrow import _dataframe_to_data_files, schema_to_pyarrow
-from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC
 from pyiceberg.schema import Schema
-from pyiceberg.table import TableProperties
-from pyiceberg.table.snapshots import Operation
-from pyiceberg.table.sorting import (
-    NullOrder,
-    SortDirection,
-    SortField,
-    SortOrder,
-)
-from pyiceberg.transforms import IdentityTransform
-from pyiceberg.typedef import Identifier
-from pyiceberg.types import IntegerType, NestedField, StringType, strtobool
+from pyiceberg.types import NestedField, StringType, strtobool
 
 CATALOG_TABLES = [c.__tablename__ for c in 
SqlCatalogBaseTable.__subclasses__()]
 
@@ -71,36 +46,6 @@ def catalog_name() -> str:
     return "test_sql_catalog"
 
 
[email protected](name="random_table_identifier")
-def fixture_random_table_identifier(warehouse: Path, database_name: str, 
table_name: str) -> Identifier:
-    os.makedirs(f"{warehouse}/{database_name}/{table_name}/metadata/", 
exist_ok=True)
-    return database_name, table_name
-
-
[email protected](name="another_random_table_identifier")
-def fixture_another_random_table_identifier(warehouse: Path, database_name: 
str, table_name: str) -> Identifier:
-    database_name = database_name + "_new"
-    table_name = table_name + "_new"
-    os.makedirs(f"{warehouse}/{database_name}/{table_name}/metadata/", 
exist_ok=True)
-    return database_name, table_name
-
-
[email protected](name="random_hierarchical_identifier")
-def fixture_random_hierarchical_identifier(warehouse: Path, 
hierarchical_namespace_name: str, table_name: str) -> Identifier:
-    
os.makedirs(f"{warehouse}/{hierarchical_namespace_name}/{table_name}/metadata/",
 exist_ok=True)
-    return Catalog.identifier_to_tuple(".".join((hierarchical_namespace_name, 
table_name)))
-
-
[email protected](name="another_random_hierarchical_identifier")
-def fixture_another_random_hierarchical_identifier(
-    warehouse: Path, hierarchical_namespace_name: str, table_name: str
-) -> Identifier:
-    hierarchical_namespace_name = hierarchical_namespace_name + "_new"
-    table_name = table_name + "_new"
-    
os.makedirs(f"{warehouse}/{hierarchical_namespace_name}/{table_name}/metadata/",
 exist_ok=True)
-    return Catalog.identifier_to_tuple(".".join((hierarchical_namespace_name, 
table_name)))
-
-
 @pytest.fixture(scope="module")
 def catalog_memory(catalog_name: str, warehouse: Path) -> 
Generator[SqlCatalog, None, None]:
     props = {
@@ -135,32 +80,6 @@ def alchemy_engine(catalog_uri: str) -> Engine:
     return create_engine(catalog_uri)
 
 
[email protected](scope="module")
-def catalog_sqlite_without_rowcount(catalog_name: str, warehouse: Path) -> 
Generator[SqlCatalog, None, None]:
-    props = {
-        "uri": f"sqlite:////{warehouse}/sql-catalog",
-        "warehouse": f"file://{warehouse}",
-    }
-    catalog = SqlCatalog(catalog_name, **props)
-    catalog.engine.dialect.supports_sane_rowcount = False
-    catalog.create_tables()
-    yield catalog
-    catalog.destroy_tables()
-
-
[email protected](scope="module")
-def catalog_sqlite_fsspec(catalog_name: str, warehouse: Path) -> 
Generator[SqlCatalog, None, None]:
-    props = {
-        "uri": f"sqlite:////{warehouse}/sql-catalog",
-        "warehouse": f"file://{warehouse}",
-        PY_IO_IMPL: FSSPEC_FILE_IO,
-    }
-    catalog = SqlCatalog(catalog_name, **props)
-    catalog.create_tables()
-    yield catalog
-    catalog.destroy_tables()
-
-
 def test_creation_with_no_uri(catalog_name: str) -> None:
     with pytest.raises(NoSuchPropertyException):
         SqlCatalog(catalog_name, not_uri="unused")
@@ -291,1426 +210,6 @@ def test_creation_when_all_tables_exists(alchemy_engine: 
Engine, catalog_name: s
     confirm_all_tables_exist(catalog)
 
 
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-    ],
-)
-def test_create_tables_idempotency(catalog: SqlCatalog) -> None:
-    # Second initialization should not fail even if tables are already created
-    catalog.create_tables()
-    catalog.create_tables()
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-    ],
-)
[email protected](
-    "table_identifier",
-    [
-        lazy_fixture("random_table_identifier"),
-        lazy_fixture("random_hierarchical_identifier"),
-    ],
-)
-def test_create_table_default_sort_order(catalog: SqlCatalog, 
table_schema_nested: Schema, table_identifier: Identifier) -> None:
-    namespace = Catalog.namespace_from(table_identifier)
-    catalog.create_namespace(namespace)
-    table = catalog.create_table(table_identifier, table_schema_nested)
-    assert table.sort_order().order_id == 0, "Order ID must match"
-    assert table.sort_order().is_unsorted is True, "Order must be unsorted"
-    catalog.drop_table(table_identifier)
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-    ],
-)
[email protected](
-    "table_identifier",
-    [
-        lazy_fixture("random_table_identifier"),
-        lazy_fixture("random_hierarchical_identifier"),
-    ],
-)
-def test_create_v1_table(catalog: SqlCatalog, table_schema_nested: Schema, 
table_identifier: Identifier) -> None:
-    namespace = Catalog.namespace_from(table_identifier)
-    catalog.create_namespace(namespace)
-    table = catalog.create_table(table_identifier, table_schema_nested, 
properties={"format-version": "1"})
-    assert table.sort_order().order_id == 0, "Order ID must match"
-    assert table.sort_order().is_unsorted is True, "Order must be unsorted"
-    assert table.format_version == 1
-    assert table.spec() == UNPARTITIONED_PARTITION_SPEC
-    catalog.drop_table(table_identifier)
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-    ],
-)
[email protected](
-    "table_identifier",
-    [
-        lazy_fixture("random_table_identifier"),
-        lazy_fixture("random_hierarchical_identifier"),
-    ],
-)
-def test_create_table_with_pyarrow_schema(
-    catalog: SqlCatalog,
-    pyarrow_schema_simple_without_ids: pa.Schema,
-    iceberg_table_schema_simple: Schema,
-    table_identifier: Identifier,
-) -> None:
-    namespace = Catalog.namespace_from(table_identifier)
-    catalog.create_namespace(namespace)
-    table = catalog.create_table(table_identifier, 
pyarrow_schema_simple_without_ids)
-    assert table.schema() == iceberg_table_schema_simple
-    catalog.drop_table(table_identifier)
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-    ],
-)
[email protected](
-    "table_identifier",
-    [
-        lazy_fixture("random_table_identifier"),
-        lazy_fixture("random_hierarchical_identifier"),
-    ],
-)
-def test_write_pyarrow_schema(catalog: SqlCatalog, table_identifier: 
Identifier) -> None:
-    import pyarrow as pa
-
-    pyarrow_table = pa.Table.from_arrays(
-        [
-            pa.array([None, "A", "B", "C"]),  # 'foo' column
-            pa.array([1, 2, 3, 4]),  # 'bar' column
-            pa.array([True, None, False, True]),  # 'baz' column
-            pa.array([None, "A", "B", "C"]),  # 'large' column
-        ],
-        schema=pa.schema(
-            [
-                pa.field("foo", pa.large_string(), nullable=True),
-                pa.field("bar", pa.int32(), nullable=False),
-                pa.field("baz", pa.bool_(), nullable=True),
-                pa.field("large", pa.large_string(), nullable=True),
-            ]
-        ),
-    )
-    namespace = Catalog.namespace_from(table_identifier)
-    catalog.create_namespace(namespace)
-    table = catalog.create_table(table_identifier, pyarrow_table.schema)
-    table.append(pyarrow_table)
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-    ],
-)
[email protected](
-    "table_identifier",
-    [
-        lazy_fixture("random_table_identifier"),
-        lazy_fixture("random_hierarchical_identifier"),
-    ],
-)
-def test_create_table_custom_sort_order(catalog: SqlCatalog, 
table_schema_nested: Schema, table_identifier: Identifier) -> None:
-    namespace = Catalog.namespace_from(table_identifier)
-    catalog.create_namespace(namespace)
-    order = SortOrder(SortField(source_id=2, transform=IdentityTransform(), 
null_order=NullOrder.NULLS_FIRST))
-    table = catalog.create_table(table_identifier, table_schema_nested, 
sort_order=order)
-    given_sort_order = table.sort_order()
-    assert given_sort_order.order_id == 1, "Order ID must match"
-    assert len(given_sort_order.fields) == 1, "Order must have 1 field"
-    assert given_sort_order.fields[0].direction == SortDirection.ASC, 
"Direction must match"
-    assert given_sort_order.fields[0].null_order == NullOrder.NULLS_FIRST, 
"Null order must match"
-    assert isinstance(given_sort_order.fields[0].transform, 
IdentityTransform), "Transform must match"
-    catalog.drop_table(table_identifier)
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-    ],
-)
[email protected](
-    "table_identifier",
-    [
-        lazy_fixture("random_table_identifier"),
-        lazy_fixture("random_hierarchical_identifier"),
-    ],
-)
-def test_create_table_with_default_warehouse_location(
-    warehouse: Path, catalog: SqlCatalog, table_schema_nested: Schema, 
table_identifier: Identifier
-) -> None:
-    identifier_tuple = Catalog.identifier_to_tuple(table_identifier)
-    namespace = Catalog.namespace_from(table_identifier)
-    catalog.create_namespace(namespace)
-    catalog.create_table(table_identifier, table_schema_nested)
-    table = catalog.load_table(table_identifier)
-    assert table.name() == identifier_tuple
-    assert table.metadata_location.startswith(f"file://{warehouse}")
-    assert os.path.exists(table.metadata_location[len("file://") :])
-    catalog.drop_table(table_identifier)
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-    ],
-)
[email protected](
-    "table_identifier",
-    [
-        lazy_fixture("random_table_identifier"),
-        lazy_fixture("random_hierarchical_identifier"),
-    ],
-)
-def test_create_table_with_given_location_removes_trailing_slash(
-    warehouse: Path, catalog: SqlCatalog, table_schema_nested: Schema, 
table_identifier: Identifier
-) -> None:
-    identifier_tuple = Catalog.identifier_to_tuple(table_identifier)
-    namespace = Catalog.namespace_from(table_identifier)
-    table_name = Catalog.table_name_from(identifier_tuple)
-    location = f"file://{warehouse}/{catalog.name}/{table_name}-given"
-    catalog.create_namespace(namespace)
-    catalog.create_table(table_identifier, table_schema_nested, 
location=f"{location}/")
-    table = catalog.load_table(table_identifier)
-    assert table.name() == identifier_tuple
-    assert table.metadata_location.startswith(f"file://{warehouse}")
-    assert os.path.exists(table.metadata_location[len("file://") :])
-    assert table.location() == location
-    catalog.drop_table(table_identifier)
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-    ],
-)
[email protected](
-    "table_identifier",
-    [
-        lazy_fixture("random_table_identifier"),
-        lazy_fixture("random_hierarchical_identifier"),
-    ],
-)
-def test_create_duplicated_table(catalog: SqlCatalog, table_schema_nested: 
Schema, table_identifier: Identifier) -> None:
-    namespace = Catalog.namespace_from(table_identifier)
-    catalog.create_namespace(namespace)
-    catalog.create_table(table_identifier, table_schema_nested)
-    with pytest.raises(TableAlreadyExistsError):
-        catalog.create_table(table_identifier, table_schema_nested)
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-    ],
-)
[email protected](
-    "table_identifier",
-    [
-        lazy_fixture("random_table_identifier"),
-        lazy_fixture("random_hierarchical_identifier"),
-    ],
-)
-def test_create_table_if_not_exists_duplicated_table(
-    catalog: SqlCatalog, table_schema_nested: Schema, table_identifier: 
Identifier
-) -> None:
-    namespace = Catalog.namespace_from(table_identifier)
-    catalog.create_namespace(namespace)
-    table1 = catalog.create_table(table_identifier, table_schema_nested)
-    table2 = catalog.create_table_if_not_exists(table_identifier, 
table_schema_nested)
-    assert table1.name() == table2.name()
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-    ],
-)
-def test_create_table_with_non_existing_namespace(catalog: SqlCatalog, 
table_schema_nested: Schema, table_name: str) -> None:
-    identifier = ("invalid", table_name)
-    with pytest.raises(NoSuchNamespaceError):
-        catalog.create_table(identifier, table_schema_nested)
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-    ],
-)
-def test_create_table_without_namespace(catalog: SqlCatalog, 
table_schema_nested: Schema, table_name: str) -> None:
-    with pytest.raises(NoSuchNamespaceError):
-        catalog.create_table(table_name, table_schema_nested)
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-    ],
-)
[email protected](
-    "table_identifier",
-    [
-        lazy_fixture("random_table_identifier"),
-        lazy_fixture("random_hierarchical_identifier"),
-    ],
-)
-def test_register_table(catalog: SqlCatalog, table_identifier: Identifier, 
metadata_location: str) -> None:
-    identifier_tuple = Catalog.identifier_to_tuple(table_identifier)
-    namespace = Catalog.namespace_from(table_identifier)
-    catalog.create_namespace(namespace)
-    table = catalog.register_table(table_identifier, metadata_location)
-    assert table.name() == identifier_tuple
-    assert table.metadata_location == metadata_location
-    assert os.path.exists(metadata_location)
-    catalog.drop_table(table_identifier)
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-    ],
-)
[email protected](
-    "table_identifier",
-    [
-        lazy_fixture("random_table_identifier"),
-        lazy_fixture("random_hierarchical_identifier"),
-    ],
-)
-def test_register_existing_table(catalog: SqlCatalog, table_identifier: 
Identifier, metadata_location: str) -> None:
-    namespace = Catalog.namespace_from(table_identifier)
-    catalog.create_namespace(namespace)
-    catalog.register_table(table_identifier, metadata_location)
-    with pytest.raises(TableAlreadyExistsError):
-        catalog.register_table(table_identifier, metadata_location)
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-    ],
-)
-def test_register_table_with_non_existing_namespace(catalog: SqlCatalog, 
metadata_location: str, table_name: str) -> None:
-    identifier = ("invalid", table_name)
-    with pytest.raises(NoSuchNamespaceError):
-        catalog.register_table(identifier, metadata_location)
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-    ],
-)
-def test_register_table_without_namespace(catalog: SqlCatalog, 
metadata_location: str, table_name: str) -> None:
-    with pytest.raises(ValueError):
-        catalog.register_table(table_name, metadata_location)
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-    ],
-)
[email protected](
-    "table_identifier",
-    [
-        lazy_fixture("random_table_identifier"),
-        lazy_fixture("random_hierarchical_identifier"),
-    ],
-)
-def test_load_table(catalog: SqlCatalog, table_schema_nested: Schema, 
table_identifier: Identifier) -> None:
-    namespace = Catalog.namespace_from(table_identifier)
-    catalog.create_namespace(namespace)
-    table = catalog.create_table(table_identifier, table_schema_nested)
-    loaded_table = catalog.load_table(table_identifier)
-    assert table.name() == loaded_table.name()
-    assert table.metadata_location == loaded_table.metadata_location
-    assert table.metadata == loaded_table.metadata
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-    ],
-)
[email protected](
-    "table_identifier",
-    [
-        lazy_fixture("random_table_identifier"),
-        lazy_fixture("random_hierarchical_identifier"),
-    ],
-)
-def test_load_table_from_self_identifier(catalog: SqlCatalog, 
table_schema_nested: Schema, table_identifier: Identifier) -> None:
-    identifier_tuple = Catalog.identifier_to_tuple(table_identifier)
-    namespace = Catalog.namespace_from(table_identifier)
-    catalog.create_namespace(namespace)
-    table = catalog.create_table(table_identifier, table_schema_nested)
-    intermediate = catalog.load_table(table_identifier)
-    assert intermediate.name() == identifier_tuple
-    loaded_table = catalog.load_table(intermediate.name())
-    assert table.name() == loaded_table.name()
-    assert table.metadata_location == loaded_table.metadata_location
-    assert table.metadata == loaded_table.metadata
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-        lazy_fixture("catalog_sqlite_without_rowcount"),
-    ],
-)
[email protected](
-    "table_identifier",
-    [
-        lazy_fixture("random_table_identifier"),
-        lazy_fixture("random_hierarchical_identifier"),
-    ],
-)
-def test_drop_table(catalog: SqlCatalog, table_schema_nested: Schema, 
table_identifier: Identifier) -> None:
-    identifier_tuple = Catalog.identifier_to_tuple(table_identifier)
-    namespace = Catalog.namespace_from(table_identifier)
-    catalog.create_namespace(namespace)
-    table = catalog.create_table(table_identifier, table_schema_nested)
-    assert table.name() == identifier_tuple
-    catalog.drop_table(table_identifier)
-    with pytest.raises(NoSuchTableError):
-        catalog.load_table(table_identifier)
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-        lazy_fixture("catalog_sqlite_without_rowcount"),
-    ],
-)
[email protected](
-    "table_identifier",
-    [
-        lazy_fixture("random_table_identifier"),
-        lazy_fixture("random_hierarchical_identifier"),
-    ],
-)
-def test_drop_table_from_self_identifier(catalog: SqlCatalog, 
table_schema_nested: Schema, table_identifier: Identifier) -> None:
-    identifier_tuple = Catalog.identifier_to_tuple(table_identifier)
-    namespace = Catalog.namespace_from(table_identifier)
-    catalog.create_namespace(namespace)
-    table = catalog.create_table(table_identifier, table_schema_nested)
-    assert table.name() == identifier_tuple
-    catalog.drop_table(table.name())
-    with pytest.raises(NoSuchTableError):
-        catalog.load_table(table.name())
-    with pytest.raises(NoSuchTableError):
-        catalog.load_table(table_identifier)
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-        lazy_fixture("catalog_sqlite_without_rowcount"),
-    ],
-)
[email protected](
-    "table_identifier",
-    [
-        lazy_fixture("random_table_identifier"),
-        lazy_fixture("random_hierarchical_identifier"),
-    ],
-)
-def test_drop_table_that_does_not_exist(catalog: SqlCatalog, table_identifier: 
Identifier) -> None:
-    with pytest.raises(NoSuchTableError):
-        catalog.drop_table(table_identifier)
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-        lazy_fixture("catalog_sqlite_without_rowcount"),
-    ],
-)
[email protected](
-    "from_table_identifier",
-    [
-        lazy_fixture("random_table_identifier"),
-        lazy_fixture("random_hierarchical_identifier"),
-    ],
-)
[email protected](
-    "to_table_identifier",
-    [
-        lazy_fixture("another_random_table_identifier"),
-        lazy_fixture("another_random_hierarchical_identifier"),
-    ],
-)
-def test_rename_table(
-    catalog: SqlCatalog, table_schema_nested: Schema, from_table_identifier: 
Identifier, to_table_identifier: Identifier
-) -> None:
-    from_namespace = Catalog.namespace_from(from_table_identifier)
-    to_namespace = Catalog.namespace_from(to_table_identifier)
-    catalog.create_namespace(from_namespace)
-    catalog.create_namespace(to_namespace)
-    table = catalog.create_table(from_table_identifier, table_schema_nested)
-    assert table.name() == from_table_identifier
-    catalog.rename_table(from_table_identifier, to_table_identifier)
-    new_table = catalog.load_table(to_table_identifier)
-    assert new_table.name() == to_table_identifier
-    assert new_table.metadata_location == table.metadata_location
-    with pytest.raises(NoSuchTableError):
-        catalog.load_table(from_table_identifier)
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-        lazy_fixture("catalog_sqlite_without_rowcount"),
-    ],
-)
[email protected](
-    "from_table_identifier",
-    [
-        lazy_fixture("random_table_identifier"),
-        lazy_fixture("random_hierarchical_identifier"),
-    ],
-)
[email protected](
-    "to_table_identifier",
-    [
-        lazy_fixture("another_random_table_identifier"),
-        lazy_fixture("another_random_hierarchical_identifier"),
-    ],
-)
-def test_rename_table_from_self_identifier(
-    catalog: SqlCatalog, table_schema_nested: Schema, from_table_identifier: 
Identifier, to_table_identifier: Identifier
-) -> None:
-    from_namespace = Catalog.namespace_from(from_table_identifier)
-    to_namespace = Catalog.namespace_from(to_table_identifier)
-    catalog.create_namespace(from_namespace)
-    catalog.create_namespace(to_namespace)
-    table = catalog.create_table(from_table_identifier, table_schema_nested)
-    assert table.name() == from_table_identifier
-    catalog.rename_table(table.name(), to_table_identifier)
-    new_table = catalog.load_table(to_table_identifier)
-    assert new_table.name() == to_table_identifier
-    assert new_table.metadata_location == table.metadata_location
-    with pytest.raises(NoSuchTableError):
-        catalog.load_table(table.name())
-    with pytest.raises(NoSuchTableError):
-        catalog.load_table(from_table_identifier)
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-        lazy_fixture("catalog_sqlite_without_rowcount"),
-    ],
-)
[email protected](
-    "from_table_identifier",
-    [
-        lazy_fixture("random_table_identifier"),
-        lazy_fixture("random_hierarchical_identifier"),
-    ],
-)
[email protected](
-    "to_table_identifier",
-    [
-        lazy_fixture("another_random_table_identifier"),
-        lazy_fixture("another_random_hierarchical_identifier"),
-    ],
-)
-def test_rename_table_to_existing_one(
-    catalog: SqlCatalog, table_schema_nested: Schema, from_table_identifier: 
Identifier, to_table_identifier: Identifier
-) -> None:
-    from_namespace = Catalog.namespace_from(from_table_identifier)
-    to_namespace = Catalog.namespace_from(to_table_identifier)
-    catalog.create_namespace(from_namespace)
-    catalog.create_namespace(to_namespace)
-    table = catalog.create_table(from_table_identifier, table_schema_nested)
-    assert table.name() == from_table_identifier
-    new_table = catalog.create_table(to_table_identifier, table_schema_nested)
-    assert new_table.name() == to_table_identifier
-    with pytest.raises(TableAlreadyExistsError):
-        catalog.rename_table(from_table_identifier, to_table_identifier)
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-        lazy_fixture("catalog_sqlite_without_rowcount"),
-    ],
-)
[email protected](
-    "from_table_identifier",
-    [
-        lazy_fixture("random_table_identifier"),
-        lazy_fixture("random_hierarchical_identifier"),
-    ],
-)
[email protected](
-    "to_table_identifier",
-    [
-        lazy_fixture("another_random_table_identifier"),
-        lazy_fixture("another_random_hierarchical_identifier"),
-    ],
-)
-def test_rename_missing_table(catalog: SqlCatalog, from_table_identifier: 
Identifier, to_table_identifier: Identifier) -> None:
-    to_namespace = Catalog.namespace_from(to_table_identifier)
-    catalog.create_namespace(to_namespace)
-    with pytest.raises(NoSuchTableError):
-        catalog.rename_table(from_table_identifier, to_table_identifier)
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-        lazy_fixture("catalog_sqlite_without_rowcount"),
-    ],
-)
[email protected](
-    "from_table_identifier",
-    [
-        lazy_fixture("random_table_identifier"),
-        lazy_fixture("random_hierarchical_identifier"),
-    ],
-)
[email protected](
-    "to_table_identifier",
-    [
-        lazy_fixture("another_random_table_identifier"),
-        lazy_fixture("another_random_hierarchical_identifier"),
-    ],
-)
-def test_rename_table_to_missing_namespace(
-    catalog: SqlCatalog, table_schema_nested: Schema, from_table_identifier: 
Identifier, to_table_identifier: Identifier
-) -> None:
-    from_namespace = Catalog.namespace_from(from_table_identifier)
-    catalog.create_namespace(from_namespace)
-    table = catalog.create_table(from_table_identifier, table_schema_nested)
-    assert table.name() == from_table_identifier
-    with pytest.raises(NoSuchNamespaceError):
-        catalog.rename_table(from_table_identifier, to_table_identifier)
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-    ],
-)
[email protected](
-    "table_identifier_1",
-    [
-        lazy_fixture("random_table_identifier"),
-        lazy_fixture("random_hierarchical_identifier"),
-    ],
-)
[email protected](
-    "table_identifier_2",
-    [
-        lazy_fixture("another_random_table_identifier"),
-        lazy_fixture("another_random_hierarchical_identifier"),
-    ],
-)
-def test_list_tables(
-    catalog: SqlCatalog, table_schema_nested: Schema, table_identifier_1: 
Identifier, table_identifier_2: Identifier
-) -> None:
-    namespace_1 = Catalog.namespace_from(table_identifier_1)
-    namespace_2 = Catalog.namespace_from(table_identifier_2)
-    catalog.create_namespace(namespace_1)
-    catalog.create_namespace(namespace_2)
-    catalog.create_table(table_identifier_1, table_schema_nested)
-    catalog.create_table(table_identifier_2, table_schema_nested)
-    identifier_list = catalog.list_tables(namespace_1)
-    assert len(identifier_list) == 1
-    assert table_identifier_1 in identifier_list
-
-    identifier_list = catalog.list_tables(namespace_2)
-    assert len(identifier_list) == 1
-    assert table_identifier_2 in identifier_list
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-    ],
-)
[email protected]("namespace", [lazy_fixture("database_name"), 
lazy_fixture("hierarchical_namespace_name")])
-def test_list_tables_when_missing_namespace(catalog: SqlCatalog, namespace: 
str) -> None:
-    with pytest.raises(NoSuchNamespaceError):
-        catalog.list_tables(namespace)
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-    ],
-)
-def test_create_namespace_if_not_exists(catalog: SqlCatalog, database_name: 
str) -> None:
-    catalog.create_namespace(database_name)
-    assert (database_name,) in catalog.list_namespaces()
-    catalog.create_namespace_if_not_exists(database_name)
-    assert (database_name,) in catalog.list_namespaces()
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-    ],
-)
[email protected]("namespace", [lazy_fixture("database_name"), 
lazy_fixture("hierarchical_namespace_name")])
-def test_create_namespace(catalog: SqlCatalog, namespace: str) -> None:
-    catalog.create_namespace(namespace)
-    assert (Catalog.identifier_to_tuple(namespace)[:1]) in 
catalog.list_namespaces()
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-    ],
-)
[email protected]("namespace", [lazy_fixture("database_name"), 
lazy_fixture("hierarchical_namespace_name")])
-def test_create_duplicate_namespace(catalog: SqlCatalog, namespace: str) -> 
None:
-    catalog.create_namespace(namespace)
-    with pytest.raises(NamespaceAlreadyExistsError):
-        catalog.create_namespace(namespace)
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-    ],
-)
[email protected]("namespace", [lazy_fixture("database_name"), 
lazy_fixture("hierarchical_namespace_name")])
-def test_create_namespaces_sharing_same_prefix(catalog: SqlCatalog, namespace: 
str) -> None:
-    catalog.create_namespace(namespace + "_1")
-    # Second namespace is a prefix of the first one, make sure it can be added.
-    catalog.create_namespace(namespace)
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-    ],
-)
[email protected]("namespace", [lazy_fixture("database_name"), 
lazy_fixture("hierarchical_namespace_name")])
-def test_create_namespace_with_comment_and_location(catalog: SqlCatalog, 
namespace: str) -> None:
-    test_location = "/test/location"
-    test_properties = {
-        "comment": "this is a test description",
-        "location": test_location,
-    }
-    catalog.create_namespace(namespace=namespace, properties=test_properties)
-    loaded_database_list = catalog.list_namespaces()
-    assert Catalog.identifier_to_tuple(namespace)[:1] in loaded_database_list
-    properties = catalog.load_namespace_properties(namespace)
-    assert properties["comment"] == "this is a test description"
-    assert properties["location"] == test_location
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-    ],
-)
[email protected]("namespace", [lazy_fixture("database_name"), 
lazy_fixture("hierarchical_namespace_name")])
[email protected]("ignore")
-def test_create_namespace_with_null_properties(catalog: SqlCatalog, namespace: 
str) -> None:
-    with pytest.raises(IntegrityError):
-        catalog.create_namespace(namespace=namespace, properties={None: 
"value"})  # type: ignore
-
-    with pytest.raises(IntegrityError):
-        catalog.create_namespace(namespace=namespace, properties={"key": None})
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-    ],
-)
[email protected]("empty_namespace", ["", (), (""), ("", ""), " ", (" 
")])
-def test_create_namespace_with_empty_identifier(catalog: SqlCatalog, 
empty_namespace: Any) -> None:
-    with pytest.raises(NoSuchNamespaceError):
-        catalog.create_namespace(empty_namespace)
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-    ],
-)
-def test_namespace_exists(catalog: SqlCatalog) -> None:
-    for ns in [("db1",), ("db1", "ns1"), ("db2", "ns1"), ("db3", "ns1", 
"ns2")]:
-        catalog.create_namespace(ns)
-        assert catalog._namespace_exists(ns)
-
-    assert catalog._namespace_exists("db2")  # `db2` exists because `db2.ns1` 
exists
-    assert catalog._namespace_exists("db3.ns1")  # `db3.ns1` exists because 
`db3.ns1.ns2` exists
-    assert not catalog._namespace_exists("db_")  # make sure '_' is escaped in 
the query
-    assert not catalog._namespace_exists("db%")  # make sure '%' is escaped in 
the query
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-    ],
-)
-def test_list_namespaces(catalog: SqlCatalog) -> None:
-    namespace_list = ["db", "db.ns1", "db.ns1.ns2", "db.ns2", "db2", 
"db2.ns1", "db%"]
-    for namespace in namespace_list:
-        if not catalog._namespace_exists(namespace):
-            catalog.create_namespace(namespace)
-
-    ns_list = catalog.list_namespaces()
-    for ns in [("db",), ("db%",), ("db2",)]:
-        assert ns in ns_list
-
-    ns_list = catalog.list_namespaces("db")
-    assert sorted(ns_list) == [("db", "ns1"), ("db", "ns2")]
-
-    ns_list = catalog.list_namespaces("db.ns1")
-    assert sorted(ns_list) == [("db", "ns1", "ns2")]
-
-    ns_list = catalog.list_namespaces("db.ns1.ns2")
-    assert len(ns_list) == 0
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-    ],
-)
-def test_list_namespaces_fuzzy_match(catalog: SqlCatalog) -> None:
-    namespace_list = ["db.ns1", "db.ns1.ns2", "db.ns2", "db.ns1X.ns3", 
"db_.ns1.ns2", "db2.ns1.ns2"]
-    for namespace in namespace_list:
-        if not catalog._namespace_exists(namespace):
-            catalog.create_namespace(namespace)
-
-    assert catalog.list_namespaces("db.ns1") == [("db", "ns1", "ns2")]
-
-    assert catalog.list_namespaces("db_.ns1") == [("db_", "ns1", "ns2")]
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-    ],
-)
-def test_list_non_existing_namespaces(catalog: SqlCatalog) -> None:
-    with pytest.raises(NoSuchNamespaceError):
-        catalog.list_namespaces("does_not_exist")
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-    ],
-)
[email protected](
-    "table_identifier",
-    [
-        lazy_fixture("random_table_identifier"),
-        lazy_fixture("random_hierarchical_identifier"),
-    ],
-)
-def test_drop_namespace(catalog: SqlCatalog, table_schema_nested: Schema, 
table_identifier: Identifier) -> None:
-    namespace = Catalog.namespace_from(table_identifier)
-    catalog.create_namespace(namespace)
-    assert catalog._namespace_exists(namespace)
-    catalog.create_table(table_identifier, table_schema_nested)
-    with pytest.raises(NamespaceNotEmptyError):
-        catalog.drop_namespace(namespace)
-    catalog.drop_table(table_identifier)
-    catalog.drop_namespace(namespace)
-    assert not catalog._namespace_exists(namespace)
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-    ],
-)
-def test_drop_non_existing_namespaces(catalog: SqlCatalog) -> None:
-    with pytest.raises(NoSuchNamespaceError):
-        catalog.drop_namespace("does_not_exist")
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-    ],
-)
[email protected]("namespace", [lazy_fixture("database_name"), 
lazy_fixture("hierarchical_namespace_name")])
-def test_load_namespace_properties(catalog: SqlCatalog, namespace: str) -> 
None:
-    warehouse_location = "/test/location"
-    test_properties = {
-        "comment": "this is a test description",
-        "location": f"{warehouse_location}/{namespace}",
-        "test_property1": "1",
-        "test_property2": "2",
-        "test_property3": "3",
-    }
-
-    catalog.create_namespace(namespace, test_properties)
-    listed_properties = catalog.load_namespace_properties(namespace)
-    for k, v in listed_properties.items():
-        assert k in test_properties
-        assert v == test_properties[k]
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-    ],
-)
[email protected]("namespace", [lazy_fixture("database_name"), 
lazy_fixture("hierarchical_namespace_name")])
-def test_load_empty_namespace_properties(catalog: SqlCatalog, namespace: str) 
-> None:
-    catalog.create_namespace(namespace)
-    listed_properties = catalog.load_namespace_properties(namespace)
-    assert listed_properties == {"exists": "true"}
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-    ],
-)
-def test_load_namespace_properties_non_existing_namespace(catalog: SqlCatalog) 
-> None:
-    with pytest.raises(NoSuchNamespaceError):
-        catalog.load_namespace_properties("does_not_exist")
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-    ],
-)
[email protected]("namespace", [lazy_fixture("database_name"), 
lazy_fixture("hierarchical_namespace_name")])
-def test_update_namespace_properties(catalog: SqlCatalog, namespace: str) -> 
None:
-    warehouse_location = "/test/location"
-    test_properties = {
-        "comment": "this is a test description",
-        "location": f"{warehouse_location}/{namespace}",
-        "test_property1": "1",
-        "test_property2": "2",
-        "test_property3": "3",
-    }
-    removals = {"test_property1", "test_property2", "test_property3", 
"should_not_removed"}
-    updates = {"test_property4": "4", "test_property5": "5", "comment": 
"updated test description"}
-    catalog.create_namespace(namespace, test_properties)
-    update_report = catalog.update_namespace_properties(namespace, removals, 
updates)
-    for k in updates.keys():
-        assert k in update_report.updated
-    for k in removals:
-        if k == "should_not_removed":
-            assert k in update_report.missing
-        else:
-            assert k in update_report.removed
-    assert catalog.load_namespace_properties(namespace) == {
-        "comment": "updated test description",
-        "test_property4": "4",
-        "test_property5": "5",
-        "location": f"{warehouse_location}/{namespace}",
-    }
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-        lazy_fixture("catalog_sqlite_without_rowcount"),
-    ],
-)
[email protected](
-    "table_identifier",
-    [
-        lazy_fixture("random_table_identifier"),
-        lazy_fixture("random_hierarchical_identifier"),
-    ],
-)
-def test_commit_table(catalog: SqlCatalog, table_schema_nested: Schema, 
table_identifier: Identifier) -> None:
-    namespace = Catalog.namespace_from(table_identifier)
-    catalog.create_namespace(namespace)
-    table = catalog.create_table(table_identifier, table_schema_nested)
-    last_updated_ms = table.metadata.last_updated_ms
-    original_table_metadata_location = table.metadata_location
-    original_table_last_updated_ms = table.metadata.last_updated_ms
-
-    assert catalog._parse_metadata_version(table.metadata_location) == 0
-    assert table.metadata.current_schema_id == 0
-
-    transaction = table.transaction()
-    update = transaction.update_schema()
-    update.add_column(path="b", field_type=IntegerType())
-    update.commit()
-    transaction.commit_transaction()
-
-    updated_table_metadata = table.metadata
-
-    assert catalog._parse_metadata_version(table.metadata_location) == 1
-    assert updated_table_metadata.current_schema_id == 1
-    assert len(updated_table_metadata.schemas) == 2
-    new_schema = next(schema for schema in updated_table_metadata.schemas if 
schema.schema_id == 1)
-    assert new_schema
-    assert new_schema == update._apply()
-    assert new_schema.find_field("b").field_type == IntegerType()
-    assert updated_table_metadata.last_updated_ms > last_updated_ms
-    assert len(updated_table_metadata.metadata_log) == 1
-    assert updated_table_metadata.metadata_log[0].metadata_file == 
original_table_metadata_location
-    assert updated_table_metadata.metadata_log[0].timestamp_ms == 
original_table_last_updated_ms
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-        lazy_fixture("catalog_sqlite_without_rowcount"),
-        lazy_fixture("catalog_sqlite_fsspec"),
-    ],
-)
[email protected](
-    "table_identifier",
-    [
-        lazy_fixture("random_table_identifier"),
-        lazy_fixture("random_hierarchical_identifier"),
-    ],
-)
-def test_append_table(catalog: SqlCatalog, table_schema_simple: Schema, 
table_identifier: Identifier) -> None:
-    namespace = Catalog.namespace_from(table_identifier)
-    catalog.create_namespace(namespace)
-    table = catalog.create_table(table_identifier, table_schema_simple)
-
-    df = pa.Table.from_pydict(
-        {
-            "foo": ["a"],
-            "bar": [1],
-            "baz": [True],
-        },
-        schema=schema_to_pyarrow(table_schema_simple),
-    )
-
-    table.append(df)
-
-    # new snapshot is written in APPEND mode
-    assert len(table.metadata.snapshots) == 1
-    assert table.metadata.snapshots[0].snapshot_id == 
table.metadata.current_snapshot_id
-    assert table.metadata.snapshots[0].parent_snapshot_id is None
-    assert table.metadata.snapshots[0].sequence_number == 1
-    assert table.metadata.snapshots[0].summary is not None
-    assert table.metadata.snapshots[0].summary.operation == Operation.APPEND
-    assert table.metadata.snapshots[0].summary["added-data-files"] == "1"
-    assert table.metadata.snapshots[0].summary["added-records"] == "1"
-    assert table.metadata.snapshots[0].summary["total-data-files"] == "1"
-    assert table.metadata.snapshots[0].summary["total-records"] == "1"
-    assert len(table.metadata.metadata_log) == 1
-
-    # read back the data
-    assert df == table.scan().to_arrow()
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-        lazy_fixture("catalog_sqlite_without_rowcount"),
-    ],
-)
[email protected](
-    "table_identifier",
-    [
-        lazy_fixture("random_table_identifier"),
-        lazy_fixture("random_hierarchical_identifier"),
-    ],
-)
-def test_concurrent_commit_table(catalog: SqlCatalog, table_schema_simple: 
Schema, table_identifier: Identifier) -> None:
-    namespace = Catalog.namespace_from(table_identifier)
-    catalog.create_namespace(namespace)
-    table_a = catalog.create_table(table_identifier, table_schema_simple)
-    table_b = catalog.load_table(table_identifier)
-
-    with table_a.update_schema() as update:
-        update.add_column(path="b", field_type=IntegerType())
-
-    with pytest.raises(CommitFailedException, match="Requirement failed: 
current schema id has changed: expected 0, found 1"):
-        # This one should fail since it already has been updated
-        with table_b.update_schema() as update:
-            update.add_column(path="c", field_type=IntegerType())
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-        lazy_fixture("catalog_sqlite_without_rowcount"),
-    ],
-)
[email protected]("format_version", [1, 2])
-def test_write_and_evolve(catalog: SqlCatalog, format_version: int) -> None:
-    identifier = 
f"default.arrow_write_data_and_evolve_schema_v{format_version}"
-
-    try:
-        catalog.create_namespace("default")
-    except NamespaceAlreadyExistsError:
-        pass
-
-    try:
-        catalog.drop_table(identifier=identifier)
-    except NoSuchTableError:
-        pass
-
-    pa_table = pa.Table.from_pydict(
-        {
-            "foo": ["a", None, "z"],
-        },
-        schema=pa.schema([pa.field("foo", pa.large_string(), nullable=True)]),
-    )
-
-    tbl = catalog.create_table(identifier=identifier, schema=pa_table.schema, 
properties={"format-version": str(format_version)})
-
-    pa_table_with_column = pa.Table.from_pydict(
-        {
-            "foo": ["a", None, "z"],
-            "bar": [19, None, 25],
-        },
-        schema=pa.schema(
-            [
-                pa.field("foo", pa.large_string(), nullable=True),
-                pa.field("bar", pa.int32(), nullable=True),
-            ]
-        ),
-    )
-
-    with tbl.transaction() as txn:
-        with txn.update_schema() as schema_txn:
-            schema_txn.union_by_name(pa_table_with_column.schema)
-
-        txn.append(pa_table_with_column)
-        txn.overwrite(pa_table_with_column)
-        txn.delete("foo = 'a'")
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-        lazy_fixture("catalog_sqlite_without_rowcount"),
-    ],
-)
[email protected]("format_version", [1, 2])
-def test_create_table_transaction(catalog: SqlCatalog, format_version: int) -> 
None:
-    identifier = 
f"default.arrow_create_table_transaction_{catalog.name}_{format_version}"
-    try:
-        catalog.create_namespace("default")
-    except NamespaceAlreadyExistsError:
-        pass
-
-    try:
-        catalog.drop_table(identifier=identifier)
-    except NoSuchTableError:
-        pass
-
-    pa_table = pa.Table.from_pydict(
-        {
-            "foo": ["a", None, "z"],
-        },
-        schema=pa.schema([pa.field("foo", pa.large_string(), nullable=True)]),
-    )
-
-    pa_table_with_column = pa.Table.from_pydict(
-        {
-            "foo": ["a", None, "z"],
-            "bar": [19, None, 25],
-        },
-        schema=pa.schema(
-            [
-                pa.field("foo", pa.large_string(), nullable=True),
-                pa.field("bar", pa.int32(), nullable=True),
-            ]
-        ),
-    )
-
-    with catalog.create_table_transaction(
-        identifier=identifier, schema=pa_table.schema, 
properties={"format-version": str(format_version)}
-    ) as txn:
-        with txn.update_snapshot().fast_append() as snapshot_update:
-            for data_file in 
_dataframe_to_data_files(table_metadata=txn.table_metadata, df=pa_table, 
io=txn._table.io):
-                snapshot_update.append_data_file(data_file)
-
-        with txn.update_schema() as schema_txn:
-            schema_txn.union_by_name(pa_table_with_column.schema)
-
-        with txn.update_snapshot().fast_append() as snapshot_update:
-            for data_file in _dataframe_to_data_files(
-                table_metadata=txn.table_metadata, df=pa_table_with_column, 
io=txn._table.io
-            ):
-                snapshot_update.append_data_file(data_file)
-
-    tbl = catalog.load_table(identifier=identifier)
-    assert tbl.format_version == format_version
-    assert len(tbl.scan().to_arrow()) == 6
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-        lazy_fixture("catalog_sqlite_without_rowcount"),
-    ],
-)
[email protected](
-    "table_identifier",
-    [
-        lazy_fixture("random_table_identifier"),
-        lazy_fixture("random_hierarchical_identifier"),
-    ],
-)
-def test_table_properties_int_value(catalog: SqlCatalog, table_schema_simple: 
Schema, table_identifier: Identifier) -> None:
-    # table properties can be set to int, but still serialized to string
-    namespace = Catalog.namespace_from(table_identifier)
-    catalog.create_namespace(namespace)
-    property_with_int = {"property_name": 42}
-    table = catalog.create_table(table_identifier, table_schema_simple, 
properties=property_with_int)
-    assert isinstance(table.properties["property_name"], str)
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-        lazy_fixture("catalog_sqlite_without_rowcount"),
-    ],
-)
[email protected](
-    "table_identifier",
-    [
-        lazy_fixture("random_table_identifier"),
-        lazy_fixture("random_hierarchical_identifier"),
-    ],
-)
-def test_table_properties_raise_for_none_value(
-    catalog: SqlCatalog, table_schema_simple: Schema, table_identifier: 
Identifier
-) -> None:
-    namespace = Catalog.namespace_from(table_identifier)
-    catalog.create_namespace(namespace)
-    property_with_none = {"property_name": None}
-    with pytest.raises(ValidationError) as exc_info:
-        _ = catalog.create_table(table_identifier, table_schema_simple, 
properties=property_with_none)
-    assert "None type is not a supported value in properties: property_name" 
in str(exc_info.value)
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-    ],
-)
[email protected](
-    "table_identifier",
-    [
-        lazy_fixture("random_table_identifier"),
-        lazy_fixture("random_hierarchical_identifier"),
-    ],
-)
-def test_table_exists(catalog: SqlCatalog, table_schema_simple: Schema, 
table_identifier: Identifier) -> None:
-    namespace = Catalog.namespace_from(table_identifier)
-    catalog.create_namespace(namespace)
-    catalog.create_table(table_identifier, table_schema_simple, 
properties={"format-version": "2"})
-    existing_table = table_identifier
-    # Act and Assert for an existing table
-    assert catalog.table_exists(existing_table) is True
-
-    # Act and Assert for a non-existing table
-    assert catalog.table_exists(("non", "exist")) is False
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-    ],
-)
[email protected]("format_version", [1, 2])
-def test_merge_manifests_local_file_system(catalog: SqlCatalog, 
arrow_table_with_null: pa.Table, format_version: int) -> None:
-    # To catch manifest file name collision bug during merge:
-    # https://github.com/apache/iceberg-python/pull/363#discussion_r1660691918
-    catalog.create_namespace_if_not_exists("default")
-    try:
-        catalog.drop_table("default.test_merge_manifest")
-    except NoSuchTableError:
-        pass
-    tbl = catalog.create_table(
-        "default.test_merge_manifest",
-        arrow_table_with_null.schema,
-        properties={
-            "commit.manifest-merge.enabled": "true",
-            "commit.manifest.min-count-to-merge": "2",
-            "format-version": format_version,
-        },
-    )
-
-    for _ in range(5):
-        tbl.append(arrow_table_with_null)
-
-    assert len(tbl.scan().to_arrow()) == 5 * len(arrow_table_with_null)
-    current_snapshot = tbl.current_snapshot()
-    assert current_snapshot
-    manifests = current_snapshot.manifests(tbl.io)
-    assert len(manifests) == 1
-
-
[email protected](
-    "catalog",
-    [
-        lazy_fixture("catalog_memory"),
-        lazy_fixture("catalog_sqlite"),
-        lazy_fixture("catalog_sqlite_without_rowcount"),
-    ],
-)
-def test_delete_metadata_multiple(catalog: SqlCatalog, table_schema_nested: 
Schema, random_table_identifier: str) -> None:
-    namespace = Catalog.namespace_from(random_table_identifier)
-    catalog.create_namespace(namespace)
-    table = catalog.create_table(random_table_identifier, table_schema_nested)
-
-    original_metadata_location = table.metadata_location
-
-    for i in range(5):
-        with table.transaction() as transaction:
-            with transaction.update_schema() as update:
-                update.add_column(path=f"new_column_{i}", 
field_type=IntegerType())
-
-    assert len(table.metadata.metadata_log) == 5
-    assert os.path.exists(original_metadata_location[len("file://") :])
-
-    # Set the max versions property to 2, and delete after commit
-    new_property = {
-        TableProperties.METADATA_PREVIOUS_VERSIONS_MAX: "2",
-        TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED: "true",
-    }
-
-    with table.transaction() as transaction:
-        transaction.set_properties(properties=new_property)
-
-    # Verify that only the most recent metadata files are kept
-    assert len(table.metadata.metadata_log) == 2
-    updated_metadata_1, updated_metadata_2 = table.metadata.metadata_log
-
-    # new metadata log was added, so earlier metadata logs are removed.
-    with table.transaction() as transaction:
-        with transaction.update_schema() as update:
-            update.add_column(path="new_column_x", field_type=IntegerType())
-
-    assert len(table.metadata.metadata_log) == 2
-    assert not os.path.exists(original_metadata_location[len("file://") :])
-    assert not os.path.exists(updated_metadata_1.metadata_file[len("file://") 
:])
-    assert os.path.exists(updated_metadata_2.metadata_file[len("file://") :])
-
-
 class TestSqlCatalogClose:
     """Test SqlCatalog close functionality."""
 
diff --git a/tests/conftest.py b/tests/conftest.py
index 9dff4258..9d80f489 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -45,9 +45,12 @@ import boto3
 import pytest
 from moto import mock_aws
 from pydantic_core import to_json
+from pytest_lazyfixture import lazy_fixture
 
 from pyiceberg.catalog import Catalog, load_catalog
+from pyiceberg.catalog.memory import InMemoryCatalog
 from pyiceberg.catalog.noop import NoopCatalog
+from pyiceberg.catalog.sql import SqlCatalog
 from pyiceberg.expressions import BoundReference
 from pyiceberg.io import (
     ADLS_ACCOUNT_KEY,
@@ -71,6 +74,7 @@ from pyiceberg.serializers import ToOutputFile
 from pyiceberg.table import FileScanTask, Table
 from pyiceberg.table.metadata import TableMetadataV1, TableMetadataV2, 
TableMetadataV3
 from pyiceberg.transforms import DayTransform, IdentityTransform
+from pyiceberg.typedef import Identifier
 from pyiceberg.types import (
     BinaryType,
     BooleanType,
@@ -434,17 +438,6 @@ def iceberg_schema_simple_no_ids() -> Schema:
     )
 
 
[email protected](scope="session")
-def iceberg_table_schema_simple() -> Schema:
-    return Schema(
-        NestedField(field_id=1, name="foo", field_type=StringType(), 
required=False),
-        NestedField(field_id=2, name="bar", field_type=IntegerType(), 
required=True),
-        NestedField(field_id=3, name="baz", field_type=BooleanType(), 
required=False),
-        schema_id=0,
-        identifier_field_ids=[],
-    )
-
-
 @pytest.fixture(scope="session")
 def iceberg_schema_nested() -> Schema:
     return Schema(
@@ -1887,7 +1880,7 @@ def test_schema() -> Schema:
 
 
 @pytest.fixture(scope="session")
-def test_partition_spec() -> Schema:
+def test_partition_spec() -> PartitionSpec:
     return PartitionSpec(
         PartitionField(1, 1000, IdentityTransform(), "VendorID"),
         PartitionField(2, 1001, DayTransform(), "tpep_pickup_day"),
@@ -2960,3 +2953,159 @@ def ray_session() -> Generator[Any, None, None]:
     )
     yield ray
     ray.shutdown()
+
+
+# Catalog fixtures
+
+
+def _create_memory_catalog(name: str, warehouse: Path) -> InMemoryCatalog:
+    return InMemoryCatalog(name, warehouse=f"file://{warehouse}")
+
+
+def _create_sql_catalog(name: str, warehouse: Path) -> SqlCatalog:
+    catalog = SqlCatalog(
+        name,
+        uri="sqlite:///:memory:",
+        warehouse=f"file://{warehouse}",
+    )
+    catalog.create_tables()
+    return catalog
+
+
+def _create_sql_without_rowcount_catalog(name: str, warehouse: Path) -> 
SqlCatalog:
+    props = {
+        "uri": f"sqlite:////{warehouse}/sql-catalog",
+        "warehouse": f"file://{warehouse}",
+    }
+    catalog = SqlCatalog(name, **props)
+    catalog.engine.dialect.supports_sane_rowcount = False
+    catalog.create_tables()
+    return catalog
+
+
+_CATALOG_FACTORIES = {
+    "memory": _create_memory_catalog,
+    "sql": _create_sql_catalog,
+    "sql_without_rowcount": _create_sql_without_rowcount_catalog,
+}
+
+
[email protected](params=list(_CATALOG_FACTORIES.keys()))
+def catalog(request: pytest.FixtureRequest, tmp_path: Path) -> 
Generator[Catalog, None, None]:
+    """Parameterized fixture that yields catalogs listed in 
_CATALOG_FACTORIES."""
+    catalog_type = request.param
+    factory = _CATALOG_FACTORIES[catalog_type]
+    cat = factory("test_catalog", tmp_path)
+    yield cat
+    if hasattr(cat, "destroy_tables"):
+        cat.destroy_tables()
+
+
[email protected](params=list(_CATALOG_FACTORIES.keys()))
+def catalog_with_warehouse(
+    request: pytest.FixtureRequest,
+    warehouse: Path,
+) -> Generator[Catalog, None, None]:
+    factory = _CATALOG_FACTORIES[request.param]
+    cat = factory("test_catalog", warehouse)
+    yield cat
+    if hasattr(cat, "destroy_tables"):
+        cat.destroy_tables()
+
+
[email protected](name="random_table_identifier")
+def fixture_random_table_identifier(warehouse: Path, database_name: str, 
table_name: str) -> Identifier:
+    os.makedirs(f"{warehouse}/{database_name}/{table_name}/metadata/", 
exist_ok=True)
+    return database_name, table_name
+
+
[email protected](name="another_random_table_identifier")
+def fixture_another_random_table_identifier(warehouse: Path, database_name: 
str, table_name: str) -> Identifier:
+    database_name = database_name + "_new"
+    table_name = table_name + "_new"
+    os.makedirs(f"{warehouse}/{database_name}/{table_name}/metadata/", 
exist_ok=True)
+    return database_name, table_name
+
+
[email protected](name="random_hierarchical_identifier")
+def fixture_random_hierarchical_identifier(warehouse: Path, 
hierarchical_namespace_name: str, table_name: str) -> Identifier:
+    
os.makedirs(f"{warehouse}/{hierarchical_namespace_name}/{table_name}/metadata/",
 exist_ok=True)
+    return Catalog.identifier_to_tuple(".".join((hierarchical_namespace_name, 
table_name)))
+
+
[email protected](name="another_random_hierarchical_identifier")
+def fixture_another_random_hierarchical_identifier(
+    warehouse: Path, hierarchical_namespace_name: str, table_name: str
+) -> Identifier:
+    hierarchical_namespace_name = hierarchical_namespace_name + "_new"
+    table_name = table_name + "_new"
+    
os.makedirs(f"{warehouse}/{hierarchical_namespace_name}/{table_name}/metadata/",
 exist_ok=True)
+    return Catalog.identifier_to_tuple(".".join((hierarchical_namespace_name, 
table_name)))
+
+
[email protected](scope="session")
+def fixed_test_table_identifier() -> Identifier:
+    return "com", "organization", "department", "my_table"
+
+
[email protected](scope="session")
+def another_fixed_test_table_identifier() -> Identifier:
+    return "com", "organization", "department_alt", "my_another_table"
+
+
[email protected](scope="session")
+def fixed_test_table_namespace() -> Identifier:
+    return "com", "organization", "department"
+
+
[email protected](
+    scope="session",
+    params=[
+        lazy_fixture("fixed_test_table_identifier"),
+        lazy_fixture("random_table_identifier"),
+        lazy_fixture("random_hierarchical_identifier"),
+    ],
+)
+def test_table_identifier(request: pytest.FixtureRequest) -> Identifier:
+    return request.param
+
+
[email protected](
+    scope="session",
+    params=[
+        lazy_fixture("another_fixed_test_table_identifier"),
+        lazy_fixture("another_random_table_identifier"),
+        lazy_fixture("another_random_hierarchical_identifier"),
+    ],
+)
+def another_table_identifier(request: pytest.FixtureRequest) -> Identifier:
+    return request.param
+
+
[email protected](
+    params=[
+        lazy_fixture("database_name"),
+        lazy_fixture("hierarchical_namespace_name"),
+        lazy_fixture("fixed_test_table_namespace"),
+    ],
+)
+def test_namespace(request: pytest.FixtureRequest) -> Identifier:
+    ns = request.param
+    if isinstance(ns, tuple):
+        return ns
+    if "." in ns:
+        return tuple(ns.split("."))
+    return (ns,)
+
+
[email protected](scope="session")
+def test_namespace_properties() -> dict[str, str]:
+    return {"key1": "value1", "key2": "value2"}
+
+
[email protected](scope="session")
+def test_table_properties() -> dict[str, str]:
+    return {
+        "key1": "value1",
+        "key2": "value2",
+    }
diff --git a/tests/table/test_upsert.py b/tests/table/test_upsert.py
index 35a3a119..9bc61799 100644
--- a/tests/table/test_upsert.py
+++ b/tests/table/test_upsert.py
@@ -27,11 +27,11 @@ from pyiceberg.expressions import AlwaysTrue, And, EqualTo, 
Reference
 from pyiceberg.expressions.literals import LongLiteral
 from pyiceberg.io.pyarrow import schema_to_pyarrow
 from pyiceberg.schema import Schema
-from pyiceberg.table import UpsertResult
+from pyiceberg.table import Table, UpsertResult
 from pyiceberg.table.snapshots import Operation
 from pyiceberg.table.upsert_util import create_match_filter
 from pyiceberg.types import IntegerType, NestedField, StringType, StructType
-from tests.catalog.test_base import InMemoryCatalog, Table
+from tests.catalog.test_base import InMemoryCatalog
 
 
 @pytest.fixture


Reply via email to