This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new b3a8027f Consolidating InMemoryCatalog and SqlCatalog tests (#2906)
b3a8027f is described below
commit b3a8027f037c41dd3bd68f15756feeee3b9b4a3c
Author: Neelesh Salian <[email protected]>
AuthorDate: Tue Jan 13 13:59:29 2026 -0800
Consolidating InMemoryCatalog and SqlCatalog tests (#2906)
Closes #2870
# Rationale for this change
Eliminates code duplication between `test_base.py` and `test_sql.py` by
consolidating shared catalog behavior tests into a single parameterized
test suite. This ensures consistent behavior testing across both catalog
implementations and makes it easier to add new catalog implementations
with shared test coverage.
## Changes
- New `tests/catalog/test_catalog_behaviors.py` with parameterized tests
covering both catalogs
- Added catalog fixtures to `tests/conftest.py`
- Reduced `test_base.py` to InMemoryCatalog-specific tests
- Reduced `test_sql.py` to SqlCatalog-specific tests
- Fixed return type annotation for `test_partition_spec` fixture in
`tests/conftest.py`
- Removed an import from `tests/table/test_upsert.py`
## Are these changes tested?
`make test` ran successfully locally
## Are there any user-facing changes?
No
<!-- In the case of user-facing changes, please add the changelog label.
-->
---
tests/catalog/test_base.py | 578 +-----------
tests/catalog/test_catalog_behaviors.py | 1192 ++++++++++++++++++++++++
tests/catalog/test_sql.py | 1509 +------------------------------
tests/conftest.py | 173 +++-
tests/table/test_upsert.py | 4 +-
5 files changed, 1360 insertions(+), 2096 deletions(-)
diff --git a/tests/catalog/test_base.py b/tests/catalog/test_base.py
index d91bfcdb..1a474783 100644
--- a/tests/catalog/test_base.py
+++ b/tests/catalog/test_base.py
@@ -20,35 +20,13 @@
from collections.abc import Generator
from pathlib import PosixPath
-import pyarrow as pa
import pytest
-from pydantic_core import ValidationError
-from pytest_lazyfixture import lazy_fixture
from pyiceberg.catalog import Catalog, load_catalog
from pyiceberg.catalog.memory import InMemoryCatalog
-from pyiceberg.exceptions import (
- NamespaceAlreadyExistsError,
- NamespaceNotEmptyError,
- NoSuchNamespaceError,
- NoSuchTableError,
- TableAlreadyExistsError,
-)
from pyiceberg.io import WAREHOUSE
-from pyiceberg.io.pyarrow import schema_to_pyarrow
-from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema
-from pyiceberg.table import (
- Table,
- TableProperties,
-)
-from pyiceberg.table.update import (
- AddSchemaUpdate,
- SetCurrentSchemaUpdate,
-)
-from pyiceberg.transforms import IdentityTransform
-from pyiceberg.typedef import EMPTY_DICT, Properties
-from pyiceberg.types import IntegerType, LongType, NestedField, StringType
+from pyiceberg.types import NestedField, StringType
@pytest.fixture
@@ -58,38 +36,6 @@ def catalog(tmp_path: PosixPath) -> Generator[Catalog, None,
None]:
catalog.close()
-TEST_TABLE_IDENTIFIER = ("com", "organization", "department", "my_table")
-TEST_TABLE_NAMESPACE = ("com", "organization", "department")
-TEST_TABLE_NAME = "my_table"
-TEST_TABLE_SCHEMA = Schema(
- NestedField(1, "x", LongType(), required=True),
- NestedField(2, "y", LongType(), doc="comment", required=True),
- NestedField(3, "z", LongType(), required=True),
-)
-TEST_TABLE_PARTITION_SPEC = PartitionSpec(PartitionField(name="x",
transform=IdentityTransform(), source_id=1, field_id=1000))
-TEST_TABLE_PROPERTIES = {"key1": "value1", "key2": "value2"}
-NO_SUCH_TABLE_ERROR = "Table does not exist:
com.organization.department.my_table"
-TABLE_ALREADY_EXISTS_ERROR = "Table com.organization.department.my_table
already exists"
-NAMESPACE_ALREADY_EXISTS_ERROR = "Namespace \\('com', 'organization',
'department'\\) already exists"
-# TODO: consolidate namespace error messages then remove this
-DROP_NOT_EXISTING_NAMESPACE_ERROR = "Namespace does not exist: \\('com',
'organization', 'department'\\)"
-NO_SUCH_NAMESPACE_ERROR = "Namespace com.organization.department does not
exists"
-NAMESPACE_NOT_EMPTY_ERROR = "Namespace com.organization.department is not
empty"
-
-
-def given_catalog_has_a_table(
- catalog: InMemoryCatalog,
- properties: Properties = EMPTY_DICT,
-) -> Table:
- catalog.create_namespace(TEST_TABLE_NAMESPACE)
- return catalog.create_table(
- identifier=TEST_TABLE_IDENTIFIER,
- schema=TEST_TABLE_SCHEMA,
- partition_spec=TEST_TABLE_PARTITION_SPEC,
- properties=properties or TEST_TABLE_PROPERTIES,
- )
-
-
def test_load_catalog_in_memory() -> None:
assert load_catalog("catalog", type="in-memory")
@@ -118,533 +64,11 @@ def test_load_catalog_has_type_and_impl() -> None:
)
-def test_namespace_from_tuple() -> None:
- # Given
- identifier = ("com", "organization", "department", "my_table")
- # When
- namespace_from = Catalog.namespace_from(identifier)
- # Then
- assert namespace_from == ("com", "organization", "department")
-
-
-def test_namespace_from_str() -> None:
- # Given
- identifier = "com.organization.department.my_table"
- # When
- namespace_from = Catalog.namespace_from(identifier)
- # Then
- assert namespace_from == ("com", "organization", "department")
-
-
-def test_name_from_tuple() -> None:
- # Given
- identifier = ("com", "organization", "department", "my_table")
- # When
- name_from = Catalog.table_name_from(identifier)
- # Then
- assert name_from == "my_table"
-
-
-def test_name_from_str() -> None:
- # Given
- identifier = "com.organization.department.my_table"
- # When
- name_from = Catalog.table_name_from(identifier)
- # Then
- assert name_from == "my_table"
-
-
-def test_create_table(catalog: InMemoryCatalog) -> None:
- catalog.create_namespace(TEST_TABLE_NAMESPACE)
- table = catalog.create_table(
- identifier=TEST_TABLE_IDENTIFIER,
- schema=TEST_TABLE_SCHEMA,
- partition_spec=TEST_TABLE_PARTITION_SPEC,
- properties=TEST_TABLE_PROPERTIES,
- )
- assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table
-
-
-def test_create_table_location_override(catalog: InMemoryCatalog) -> None:
- new_location = f"{catalog._warehouse_location}/new_location"
- catalog.create_namespace(TEST_TABLE_NAMESPACE)
- table = catalog.create_table(
- identifier=TEST_TABLE_IDENTIFIER,
- schema=TEST_TABLE_SCHEMA,
- location=new_location,
- partition_spec=TEST_TABLE_PARTITION_SPEC,
- properties=TEST_TABLE_PROPERTIES,
- )
- assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table
- assert table.location() == new_location
-
-
-def test_create_table_removes_trailing_slash_from_location(catalog:
InMemoryCatalog) -> None:
- new_location = f"{catalog._warehouse_location}/new_location"
- catalog.create_namespace(TEST_TABLE_NAMESPACE)
- table = catalog.create_table(
- identifier=TEST_TABLE_IDENTIFIER,
- schema=TEST_TABLE_SCHEMA,
- location=f"{new_location}/",
- partition_spec=TEST_TABLE_PARTITION_SPEC,
- properties=TEST_TABLE_PROPERTIES,
- )
- assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table
- assert table.location() == new_location
-
-
[email protected](
- "schema,expected",
- [
- (lazy_fixture("pyarrow_schema_simple_without_ids"),
lazy_fixture("iceberg_schema_simple_no_ids")),
- (lazy_fixture("iceberg_schema_simple"),
lazy_fixture("iceberg_schema_simple")),
- (lazy_fixture("iceberg_schema_nested"),
lazy_fixture("iceberg_schema_nested")),
- (lazy_fixture("pyarrow_schema_nested_without_ids"),
lazy_fixture("iceberg_schema_nested_no_ids")),
- ],
-)
-def test_convert_schema_if_needed(
- schema: Schema | pa.Schema,
- expected: Schema,
- catalog: InMemoryCatalog,
-) -> None:
- assert expected == catalog._convert_schema_if_needed(schema)
-
-
-def test_convert_schema_if_needed_rejects_null_type(catalog: InMemoryCatalog)
-> None:
- schema = pa.schema([pa.field("n1", pa.null())])
- with pytest.raises(ValueError) as exc_info:
- catalog._convert_schema_if_needed(schema)
- message = str(exc_info.value)
- assert "Null type" in message
- assert "n1" in message
- assert "format-version=3" in message
-
-
-def test_create_table_pyarrow_schema(catalog: InMemoryCatalog,
pyarrow_schema_simple_without_ids: pa.Schema) -> None:
- catalog.create_namespace(TEST_TABLE_NAMESPACE)
- table = catalog.create_table(
- identifier=TEST_TABLE_IDENTIFIER,
- schema=pyarrow_schema_simple_without_ids,
- properties=TEST_TABLE_PROPERTIES,
- )
- assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table
-
-
-def test_create_table_raises_error_when_table_already_exists(catalog:
InMemoryCatalog) -> None:
- # Given
- given_catalog_has_a_table(catalog)
- # When
- with pytest.raises(TableAlreadyExistsError,
match=TABLE_ALREADY_EXISTS_ERROR):
- catalog.create_table(
- identifier=TEST_TABLE_IDENTIFIER,
- schema=TEST_TABLE_SCHEMA,
- )
-
-
-def test_load_table(catalog: InMemoryCatalog) -> None:
- # Given
- given_table = given_catalog_has_a_table(catalog)
- # When
- table = catalog.load_table(TEST_TABLE_IDENTIFIER)
- # Then
- assert table == given_table
-
-
-def test_load_table_from_self_identifier(catalog: InMemoryCatalog) -> None:
- # Given
- given_table = given_catalog_has_a_table(catalog)
- # When
- intermediate = catalog.load_table(TEST_TABLE_IDENTIFIER)
- table = catalog.load_table(intermediate._identifier)
- # Then
- assert table == given_table
-
-
-def test_table_raises_error_on_table_not_found(catalog: InMemoryCatalog) ->
None:
- with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
- catalog.load_table(TEST_TABLE_IDENTIFIER)
-
-
-def test_table_exists(catalog: InMemoryCatalog) -> None:
- # Given
- given_catalog_has_a_table(catalog)
- # Then
- assert catalog.table_exists(TEST_TABLE_IDENTIFIER)
-
-
-def test_table_exists_on_table_not_found(catalog: InMemoryCatalog) -> None:
- assert not catalog.table_exists(TEST_TABLE_IDENTIFIER)
-
-
-def test_drop_table(catalog: InMemoryCatalog) -> None:
- # Given
- given_catalog_has_a_table(catalog)
- # When
- catalog.drop_table(TEST_TABLE_IDENTIFIER)
- # Then
- with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
- catalog.load_table(TEST_TABLE_IDENTIFIER)
-
-
-def test_drop_table_from_self_identifier(catalog: InMemoryCatalog) -> None:
- # Given
- table = given_catalog_has_a_table(catalog)
- # When
- catalog.drop_table(table._identifier)
- # Then
- with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
- catalog.load_table(table._identifier)
- with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
- catalog.load_table(TEST_TABLE_IDENTIFIER)
-
-
-def test_drop_table_that_does_not_exist_raise_error(catalog: InMemoryCatalog)
-> None:
- with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
- catalog.load_table(TEST_TABLE_IDENTIFIER)
-
-
-def test_purge_table(catalog: InMemoryCatalog) -> None:
- # Given
- given_catalog_has_a_table(catalog)
- # When
- catalog.purge_table(TEST_TABLE_IDENTIFIER)
- # Then
- with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
- catalog.load_table(TEST_TABLE_IDENTIFIER)
-
-
-def test_rename_table(catalog: InMemoryCatalog) -> None:
- # Given
- given_catalog_has_a_table(catalog)
-
- # When
- new_table = "new.namespace.new_table"
- catalog.create_namespace(("new", "namespace"))
- table = catalog.rename_table(TEST_TABLE_IDENTIFIER, new_table)
-
- # Then
- assert table._identifier == Catalog.identifier_to_tuple(new_table)
-
- # And
- table = catalog.load_table(new_table)
- assert table._identifier == Catalog.identifier_to_tuple(new_table)
-
- # And
- assert catalog._namespace_exists(table._identifier[:-1])
-
- # And
- with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
- catalog.load_table(TEST_TABLE_IDENTIFIER)
-
-
-def test_rename_table_from_self_identifier(catalog: InMemoryCatalog) -> None:
- # Given
- table = given_catalog_has_a_table(catalog)
-
- # When
- new_table_name = "new.namespace.new_table"
- catalog.create_namespace(("new", "namespace"))
- new_table = catalog.rename_table(table._identifier, new_table_name)
-
- # Then
- assert new_table._identifier == Catalog.identifier_to_tuple(new_table_name)
-
- # And
- new_table = catalog.load_table(new_table._identifier)
- assert new_table._identifier == Catalog.identifier_to_tuple(new_table_name)
-
- # And
- assert catalog._namespace_exists(new_table._identifier[:-1])
-
- # And
- with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
- catalog.load_table(table._identifier)
- with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
- catalog.load_table(TEST_TABLE_IDENTIFIER)
-
-
-def test_create_namespace(catalog: InMemoryCatalog) -> None:
- # When
- catalog.create_namespace(TEST_TABLE_NAMESPACE, TEST_TABLE_PROPERTIES)
-
- # Then
- assert catalog._namespace_exists(TEST_TABLE_NAMESPACE)
- assert TEST_TABLE_PROPERTIES ==
catalog.load_namespace_properties(TEST_TABLE_NAMESPACE)
-
-
-def test_create_namespace_raises_error_on_existing_namespace(catalog:
InMemoryCatalog) -> None:
- # Given
- catalog.create_namespace(TEST_TABLE_NAMESPACE, TEST_TABLE_PROPERTIES)
- # When
- with pytest.raises(NamespaceAlreadyExistsError,
match=NAMESPACE_ALREADY_EXISTS_ERROR):
- catalog.create_namespace(TEST_TABLE_NAMESPACE, TEST_TABLE_PROPERTIES)
-
-
-def
test_get_namespace_metadata_raises_error_when_namespace_does_not_exist(catalog:
InMemoryCatalog) -> None:
- with pytest.raises(NoSuchNamespaceError, match=NO_SUCH_NAMESPACE_ERROR):
- catalog.load_namespace_properties(TEST_TABLE_NAMESPACE)
-
-
-def test_list_namespaces(catalog: InMemoryCatalog) -> None:
- # Given
- catalog.create_namespace(TEST_TABLE_NAMESPACE, TEST_TABLE_PROPERTIES)
- # When
- namespaces = catalog.list_namespaces()
- # Then
- assert TEST_TABLE_NAMESPACE[:1] in namespaces
-
- # When
- namespaces = catalog.list_namespaces(TEST_TABLE_NAMESPACE)
- # Then
- assert not namespaces
-
-
-def test_drop_namespace(catalog: InMemoryCatalog) -> None:
- # Given
- catalog.create_namespace(TEST_TABLE_NAMESPACE, TEST_TABLE_PROPERTIES)
- # When
- catalog.drop_namespace(TEST_TABLE_NAMESPACE)
- # Then
- assert not catalog._namespace_exists(TEST_TABLE_NAMESPACE)
-
-
-def test_drop_namespace_raises_error_when_namespace_does_not_exist(catalog:
InMemoryCatalog) -> None:
- with pytest.raises(NoSuchNamespaceError,
match=DROP_NOT_EXISTING_NAMESPACE_ERROR):
- catalog.drop_namespace(TEST_TABLE_NAMESPACE)
-
-
-def test_drop_namespace_raises_error_when_namespace_not_empty(catalog:
InMemoryCatalog) -> None:
- # Given
- given_catalog_has_a_table(catalog)
- # When
- with pytest.raises(NamespaceNotEmptyError,
match=NAMESPACE_NOT_EMPTY_ERROR):
- catalog.drop_namespace(TEST_TABLE_NAMESPACE)
-
-
-def test_list_tables(catalog: InMemoryCatalog) -> None:
- # Given
- given_catalog_has_a_table(catalog)
- # When
- tables = catalog.list_tables(namespace=TEST_TABLE_NAMESPACE)
- # Then
- assert tables
- assert TEST_TABLE_IDENTIFIER in tables
-
-
-def test_list_tables_under_a_namespace(catalog: InMemoryCatalog) -> None:
- # Given
- given_catalog_has_a_table(catalog)
- new_namespace = ("new", "namespace")
- catalog.create_namespace(new_namespace)
- # When
- all_tables = catalog.list_tables(namespace=TEST_TABLE_NAMESPACE)
- new_namespace_tables = catalog.list_tables(new_namespace)
- # Then
- assert all_tables
- assert TEST_TABLE_IDENTIFIER in all_tables
- assert new_namespace_tables == []
-
-
-def test_update_namespace_metadata(catalog: InMemoryCatalog) -> None:
- # Given
- catalog.create_namespace(TEST_TABLE_NAMESPACE, TEST_TABLE_PROPERTIES)
-
- # When
- new_metadata = {"key3": "value3", "key4": "value4"}
- summary = catalog.update_namespace_properties(TEST_TABLE_NAMESPACE,
updates=new_metadata)
-
- # Then
- assert catalog._namespace_exists(TEST_TABLE_NAMESPACE)
- assert new_metadata.items() <=
catalog.load_namespace_properties(TEST_TABLE_NAMESPACE).items()
- assert summary.removed == []
- assert sorted(summary.updated) == ["key3", "key4"]
- assert summary.missing == []
-
-
-def test_update_namespace_metadata_removals(catalog: InMemoryCatalog) -> None:
- # Given
- catalog.create_namespace(TEST_TABLE_NAMESPACE, TEST_TABLE_PROPERTIES)
-
- # When
- new_metadata = {"key3": "value3", "key4": "value4"}
- remove_metadata = {"key1"}
- summary = catalog.update_namespace_properties(TEST_TABLE_NAMESPACE,
remove_metadata, new_metadata)
-
- # Then
- assert catalog._namespace_exists(TEST_TABLE_NAMESPACE)
- assert new_metadata.items() <=
catalog.load_namespace_properties(TEST_TABLE_NAMESPACE).items()
- assert
remove_metadata.isdisjoint(catalog.load_namespace_properties(TEST_TABLE_NAMESPACE).keys())
- assert summary.removed == ["key1"]
- assert sorted(summary.updated) == ["key3", "key4"]
- assert summary.missing == []
-
-
-def
test_update_namespace_metadata_raises_error_when_namespace_does_not_exist(catalog:
InMemoryCatalog) -> None:
- with pytest.raises(NoSuchNamespaceError, match=NO_SUCH_NAMESPACE_ERROR):
- catalog.update_namespace_properties(TEST_TABLE_NAMESPACE,
updates=TEST_TABLE_PROPERTIES)
-
-
-def test_commit_table(catalog: InMemoryCatalog) -> None:
- # Given
- given_table = given_catalog_has_a_table(catalog)
- new_schema = Schema(
- NestedField(1, "x", LongType()),
- NestedField(2, "y", LongType(), doc="comment"),
- NestedField(3, "z", LongType()),
- NestedField(4, "add", LongType()),
- )
-
- # When
- response = given_table.catalog.commit_table(
- given_table,
- updates=(
- AddSchemaUpdate(schema=new_schema),
- SetCurrentSchemaUpdate(schema_id=-1),
- ),
- requirements=(),
- )
-
- # Then
- assert response.metadata.table_uuid == given_table.metadata.table_uuid
- assert len(response.metadata.schemas) == 2
- assert response.metadata.schemas[1] == new_schema
- assert response.metadata.current_schema_id == new_schema.schema_id
-
-
-def test_add_column(catalog: InMemoryCatalog) -> None:
- given_table = given_catalog_has_a_table(catalog)
-
- given_table.update_schema().add_column(path="new_column1",
field_type=IntegerType()).commit()
-
- assert given_table.schema() == Schema(
- NestedField(field_id=1, name="x", field_type=LongType(),
required=True),
- NestedField(field_id=2, name="y", field_type=LongType(),
required=True, doc="comment"),
- NestedField(field_id=3, name="z", field_type=LongType(),
required=True),
- NestedField(field_id=4, name="new_column1", field_type=IntegerType(),
required=False),
- identifier_field_ids=[],
- )
- assert given_table.schema().schema_id == 1
-
- transaction = given_table.transaction()
- transaction.update_schema().add_column(path="new_column2",
field_type=IntegerType(), doc="doc").commit()
- transaction.commit_transaction()
-
- assert given_table.schema() == Schema(
- NestedField(field_id=1, name="x", field_type=LongType(),
required=True),
- NestedField(field_id=2, name="y", field_type=LongType(),
required=True, doc="comment"),
- NestedField(field_id=3, name="z", field_type=LongType(),
required=True),
- NestedField(field_id=4, name="new_column1", field_type=IntegerType(),
required=False),
- NestedField(field_id=5, name="new_column2", field_type=IntegerType(),
required=False, doc="doc"),
- identifier_field_ids=[],
- )
- assert given_table.schema().schema_id == 2
-
-
-def test_add_column_with_statement(catalog: InMemoryCatalog) -> None:
- given_table = given_catalog_has_a_table(catalog)
-
- with given_table.update_schema() as tx:
- tx.add_column(path="new_column1", field_type=IntegerType())
-
- assert given_table.schema() == Schema(
- NestedField(field_id=1, name="x", field_type=LongType(),
required=True),
- NestedField(field_id=2, name="y", field_type=LongType(),
required=True, doc="comment"),
- NestedField(field_id=3, name="z", field_type=LongType(),
required=True),
- NestedField(field_id=4, name="new_column1", field_type=IntegerType(),
required=False),
- identifier_field_ids=[],
- )
- assert given_table.schema().schema_id == 1
-
- with given_table.transaction() as tx:
- tx.update_schema().add_column(path="new_column2",
field_type=IntegerType(), doc="doc").commit()
-
- assert given_table.schema() == Schema(
- NestedField(field_id=1, name="x", field_type=LongType(),
required=True),
- NestedField(field_id=2, name="y", field_type=LongType(),
required=True, doc="comment"),
- NestedField(field_id=3, name="z", field_type=LongType(),
required=True),
- NestedField(field_id=4, name="new_column1", field_type=IntegerType(),
required=False),
- NestedField(field_id=5, name="new_column2", field_type=IntegerType(),
required=False, doc="doc"),
- identifier_field_ids=[],
- )
- assert given_table.schema().schema_id == 2
-
-
def test_catalog_repr(catalog: InMemoryCatalog) -> None:
s = repr(catalog)
assert s == "test.in_memory.catalog (<class
'pyiceberg.catalog.memory.InMemoryCatalog'>)"
-def test_table_properties_int_value(catalog: InMemoryCatalog) -> None:
- # table properties can be set to int, but still serialized to string
- property_with_int = {"property_name": 42}
- given_table = given_catalog_has_a_table(catalog,
properties=property_with_int)
- assert isinstance(given_table.properties["property_name"], str)
-
-
-def test_table_properties_raise_for_none_value(catalog: InMemoryCatalog) ->
None:
- property_with_none = {"property_name": None}
- with pytest.raises(ValidationError) as exc_info:
- _ = given_catalog_has_a_table(catalog, properties=property_with_none)
- assert "None type is not a supported value in properties: property_name"
in str(exc_info.value)
-
-
-def test_table_writes_metadata_to_custom_location(catalog: InMemoryCatalog) ->
None:
- metadata_path = f"{catalog._warehouse_location}/custom/path"
- catalog.create_namespace(TEST_TABLE_NAMESPACE)
- table = catalog.create_table(
- identifier=TEST_TABLE_IDENTIFIER,
- schema=TEST_TABLE_SCHEMA,
- partition_spec=TEST_TABLE_PARTITION_SPEC,
- properties={TableProperties.WRITE_METADATA_PATH: metadata_path},
- )
- df = pa.Table.from_pylist([{"x": 123, "y": 456, "z": 789}],
schema=schema_to_pyarrow(TEST_TABLE_SCHEMA))
- table.append(df)
- manifests = table.current_snapshot().manifests(table.io) # type: ignore
- location_provider = table.location_provider()
-
- assert
location_provider.new_metadata_location("").startswith(metadata_path)
- assert manifests[0].manifest_path.startswith(metadata_path)
- assert table.location() != metadata_path
- assert table.metadata_location.startswith(metadata_path)
-
-
-def test_table_writes_metadata_to_default_path(catalog: InMemoryCatalog) ->
None:
- catalog.create_namespace(TEST_TABLE_NAMESPACE)
- table = catalog.create_table(
- identifier=TEST_TABLE_IDENTIFIER,
- schema=TEST_TABLE_SCHEMA,
- partition_spec=TEST_TABLE_PARTITION_SPEC,
- properties=TEST_TABLE_PROPERTIES,
- )
- metadata_path = f"{table.location()}/metadata"
- df = pa.Table.from_pylist([{"x": 123, "y": 456, "z": 789}],
schema=schema_to_pyarrow(TEST_TABLE_SCHEMA))
- table.append(df)
- manifests = table.current_snapshot().manifests(table.io) # type: ignore
- location_provider = table.location_provider()
-
- assert
location_provider.new_metadata_location("").startswith(metadata_path)
- assert manifests[0].manifest_path.startswith(metadata_path)
- assert table.metadata_location.startswith(metadata_path)
-
-
-def test_table_metadata_writes_reflect_latest_path(catalog: InMemoryCatalog)
-> None:
- catalog.create_namespace(TEST_TABLE_NAMESPACE)
- table = catalog.create_table(
- identifier=TEST_TABLE_IDENTIFIER,
- schema=TEST_TABLE_SCHEMA,
- partition_spec=TEST_TABLE_PARTITION_SPEC,
- )
-
- initial_metadata_path = f"{table.location()}/metadata"
- assert table.location_provider().new_metadata_location("metadata.json") ==
f"{initial_metadata_path}/metadata.json"
-
- # update table with new path for metadata
- new_metadata_path = f"{table.location()}/custom/path"
- table.transaction().set_properties({TableProperties.WRITE_METADATA_PATH:
new_metadata_path}).commit_transaction()
-
- assert table.location_provider().new_metadata_location("metadata.json") ==
f"{new_metadata_path}/metadata.json"
-
-
class TestCatalogClose:
"""Test catalog close functionality."""
diff --git a/tests/catalog/test_catalog_behaviors.py
b/tests/catalog/test_catalog_behaviors.py
new file mode 100644
index 00000000..29689358
--- /dev/null
+++ b/tests/catalog/test_catalog_behaviors.py
@@ -0,0 +1,1192 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Consolidated behavior tests for InMemoryCatalog and SqlCatalog.
+"""
+
+import os
+from pathlib import Path
+from typing import Any
+
+import pyarrow as pa
+import pytest
+from pydantic_core import ValidationError
+from pytest_lazyfixture import lazy_fixture
+from sqlalchemy.exc import IntegrityError
+
+from pyiceberg.catalog import Catalog
+from pyiceberg.exceptions import (
+ CommitFailedException,
+ NamespaceAlreadyExistsError,
+ NamespaceNotEmptyError,
+ NoSuchNamespaceError,
+ NoSuchTableError,
+ TableAlreadyExistsError,
+)
+from pyiceberg.io.pyarrow import _dataframe_to_data_files, schema_to_pyarrow
+from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC,
PartitionField, PartitionSpec
+from pyiceberg.schema import Schema
+from pyiceberg.table import TableProperties
+from pyiceberg.table.snapshots import Operation
+from pyiceberg.table.sorting import NullOrder, SortDirection, SortField,
SortOrder
+from pyiceberg.table.update import AddSchemaUpdate, SetCurrentSchemaUpdate
+from pyiceberg.transforms import IdentityTransform
+from pyiceberg.typedef import Identifier
+from pyiceberg.types import BooleanType, IntegerType, LongType, NestedField,
StringType
+
+
+# Name parsing tests
+def test_namespace_from_tuple() -> None:
+ identifier = ("com", "organization", "department", "my_table")
+ namespace_from = Catalog.namespace_from(identifier)
+ assert namespace_from == ("com", "organization", "department")
+
+
+def test_namespace_from_str() -> None:
+ identifier = "com.organization.department.my_table"
+ namespace_from = Catalog.namespace_from(identifier)
+ assert namespace_from == ("com", "organization", "department")
+
+
+def test_name_from_tuple() -> None:
+ identifier = ("com", "organization", "department", "my_table")
+ name_from = Catalog.table_name_from(identifier)
+ assert name_from == "my_table"
+
+
+def test_name_from_str() -> None:
+ identifier = "com.organization.department.my_table"
+ name_from = Catalog.table_name_from(identifier)
+ assert name_from == "my_table"
+
+
+# Create table tests
+def test_create_table(catalog: Catalog, test_table_identifier: Identifier,
table_schema_simple: Schema) -> None:
+ namespace = Catalog.namespace_from(test_table_identifier)
+ catalog.create_namespace(namespace)
+ table = catalog.create_table(test_table_identifier, table_schema_simple)
+ loaded = catalog.load_table(test_table_identifier)
+ assert loaded.name() == table.name()
+ assert loaded.metadata_location == table.metadata_location
+
+
+def test_create_table_if_not_exists_duplicated_table(
+ catalog: Catalog, table_schema_nested: Schema, test_table_identifier:
Identifier
+) -> None:
+ namespace = Catalog.namespace_from(test_table_identifier)
+ catalog.create_namespace(namespace)
+ table1 = catalog.create_table(test_table_identifier, table_schema_nested)
+ table2 = catalog.create_table_if_not_exists(test_table_identifier,
table_schema_nested)
+ assert table1.name() == table2.name()
+
+
+def test_create_table_raises_error_when_table_already_exists(
+ catalog: Catalog, test_table_identifier: Identifier, table_schema_nested:
Schema
+) -> None:
+ namespace = Catalog.namespace_from(test_table_identifier)
+ catalog.create_namespace(namespace)
+ catalog.create_table(test_table_identifier, table_schema_nested)
+ with pytest.raises(TableAlreadyExistsError):
+ catalog.create_table(test_table_identifier, table_schema_nested)
+
+
+def test_table_exists(catalog: Catalog, test_table_identifier: Identifier,
table_schema_nested: Schema) -> None:
+ namespace = Catalog.namespace_from(test_table_identifier)
+ catalog.create_namespace(namespace)
+ catalog.create_table(test_table_identifier, table_schema_nested,
properties={"format-version": "2"})
+ assert catalog.table_exists(test_table_identifier)
+
+
+def test_table_exists_on_table_not_found(catalog: Catalog,
test_table_identifier: Identifier) -> None:
+ assert not catalog.table_exists(test_table_identifier)
+
+
+def test_create_table_raises_error_when_namespace_does_not_exist(catalog:
Catalog, table_schema_simple: Schema) -> None:
+ with pytest.raises(NoSuchNamespaceError):
+ catalog.create_table(("non_existent_ns", "table"), table_schema_simple)
+
+
+def test_table_raises_error_on_table_not_found(catalog: Catalog,
test_table_identifier: Identifier) -> None:
+ identifier_str = ".".join(test_table_identifier)
+ with pytest.raises(NoSuchTableError, match=f"Table does not exist:
{identifier_str}"):
+ catalog.load_table(test_table_identifier)
+
+
+def test_create_table_without_namespace(catalog: Catalog, table_schema_nested:
Schema, table_name: str) -> None:
+ with pytest.raises(NoSuchNamespaceError):
+ catalog.create_table(table_name, table_schema_nested)
+
+
[email protected]("format_version", [1, 2])
+def test_create_table_transaction(catalog: Catalog, format_version: int) ->
None:
+ identifier =
f"default.arrow_create_table_transaction_{catalog.name}_{format_version}"
+ try:
+ catalog.create_namespace("default")
+ except NamespaceAlreadyExistsError:
+ pass
+
+ try:
+ catalog.drop_table(identifier=identifier)
+ except NoSuchTableError:
+ pass
+
+ pa_table = pa.Table.from_pydict(
+ {
+ "foo": ["a", None, "z"],
+ },
+ schema=pa.schema([pa.field("foo", pa.large_string(), nullable=True)]),
+ )
+
+ pa_table_with_column = pa.Table.from_pydict(
+ {
+ "foo": ["a", None, "z"],
+ "bar": [19, None, 25],
+ },
+ schema=pa.schema(
+ [
+ pa.field("foo", pa.large_string(), nullable=True),
+ pa.field("bar", pa.int32(), nullable=True),
+ ]
+ ),
+ )
+
+ with catalog.create_table_transaction(
+ identifier=identifier, schema=pa_table.schema,
properties={"format-version": str(format_version)}
+ ) as txn:
+ with txn.update_snapshot().fast_append() as snapshot_update:
+ for data_file in
_dataframe_to_data_files(table_metadata=txn.table_metadata, df=pa_table,
io=txn._table.io):
+ snapshot_update.append_data_file(data_file)
+
+ with txn.update_schema() as schema_txn:
+ schema_txn.union_by_name(pa_table_with_column.schema)
+
+ with txn.update_snapshot().fast_append() as snapshot_update:
+ for data_file in _dataframe_to_data_files(
+ table_metadata=txn.table_metadata, df=pa_table_with_column,
io=txn._table.io
+ ):
+ snapshot_update.append_data_file(data_file)
+
+ tbl = catalog.load_table(identifier=identifier)
+ assert tbl.format_version == format_version
+ assert len(tbl.scan().to_arrow()) == 6
+
+
+def test_create_table_default_sort_order(
+ catalog: Catalog, table_schema_nested: Schema, test_table_identifier:
Identifier
+) -> None:
+ namespace = Catalog.namespace_from(test_table_identifier)
+ catalog.create_namespace(namespace)
+ table = catalog.create_table(test_table_identifier, table_schema_nested)
+ assert table.sort_order().order_id == 0, "Order ID must match"
+ assert table.sort_order().is_unsorted is True, "Order must be unsorted"
+ catalog.drop_table(test_table_identifier)
+
+
+def test_create_v1_table(catalog: Catalog, table_schema_nested: Schema,
test_table_identifier: Identifier) -> None:
+ namespace = Catalog.namespace_from(test_table_identifier)
+ catalog.create_namespace(namespace)
+ table = catalog.create_table(test_table_identifier, table_schema_nested,
properties={"format-version": "1"})
+ assert table.sort_order().order_id == 0, "Order ID must match"
+ assert table.sort_order().is_unsorted is True, "Order must be unsorted"
+ assert table.format_version == 1
+ assert table.spec() == UNPARTITIONED_PARTITION_SPEC
+ catalog.drop_table(test_table_identifier)
+
+
+def test_create_table_custom_sort_order(catalog: Catalog, table_schema_nested:
Schema, test_table_identifier: Identifier) -> None:
+ namespace = Catalog.namespace_from(test_table_identifier)
+ catalog.create_namespace(namespace)
+ order = SortOrder(SortField(source_id=2, transform=IdentityTransform(),
null_order=NullOrder.NULLS_FIRST))
+ table = catalog.create_table(test_table_identifier, table_schema_nested,
sort_order=order)
+ given_sort_order = table.sort_order()
+ assert given_sort_order.order_id == 1, "Order ID must match"
+ assert len(given_sort_order.fields) == 1, "Order must have 1 field"
+ assert given_sort_order.fields[0].direction == SortDirection.ASC,
"Direction must match"
+ assert given_sort_order.fields[0].null_order == NullOrder.NULLS_FIRST,
"Null order must match"
+ assert isinstance(given_sort_order.fields[0].transform,
IdentityTransform), "Transform must match"
+ catalog.drop_table(test_table_identifier)
+
+
+def test_create_table_with_default_warehouse_location(
+ warehouse: Path, catalog_with_warehouse: Catalog, table_schema_nested:
Schema, test_table_identifier: Identifier
+) -> None:
+ identifier_tuple = Catalog.identifier_to_tuple(test_table_identifier)
+ namespace = Catalog.namespace_from(test_table_identifier)
+ catalog_with_warehouse.create_namespace(namespace)
+ catalog_with_warehouse.create_table(test_table_identifier,
table_schema_nested)
+ table = catalog_with_warehouse.load_table(test_table_identifier)
+ assert table.name() == identifier_tuple
+ assert table.metadata_location.startswith(f"file://{warehouse}")
+ assert os.path.exists(table.metadata_location[len("file://") :])
+ catalog_with_warehouse.drop_table(test_table_identifier)
+
+
+def test_create_table_location_override(
+ catalog: Catalog,
+ tmp_path: Path,
+ table_schema_nested: Schema,
+ test_table_identifier: Identifier,
+ test_table_properties: dict[str, str],
+) -> None:
+ test_partition_spec = PartitionSpec(PartitionField(name="x",
transform=IdentityTransform(), source_id=1, field_id=1000))
+ new_location = f"file://{tmp_path}/new_location"
+ namespace = Catalog.namespace_from(test_table_identifier)
+ catalog.create_namespace(namespace)
+ table = catalog.create_table(
+ identifier=test_table_identifier,
+ schema=table_schema_nested,
+ location=new_location,
+ partition_spec=test_partition_spec,
+ properties=test_table_properties,
+ )
+ assert catalog.load_table(test_table_identifier) == table
+ assert table.location() == new_location
+
+
+def test_create_table_removes_trailing_slash_from_location(
+ warehouse: Path, catalog: Catalog, table_schema_nested: Schema,
test_table_identifier: Identifier
+) -> None:
+ identifier_tuple = Catalog.identifier_to_tuple(test_table_identifier)
+ namespace = Catalog.namespace_from(test_table_identifier)
+ table_name = Catalog.table_name_from(identifier_tuple)
+ location = f"file://{warehouse}/{catalog.name}/{table_name}-given"
+ catalog.create_namespace(namespace)
+ catalog.create_table(test_table_identifier, table_schema_nested,
location=f"{location}/")
+ table = catalog.load_table(test_table_identifier)
+ assert table.name() == identifier_tuple
+ assert table.metadata_location.startswith(f"file://{warehouse}")
+ assert os.path.exists(table.metadata_location[len("file://") :])
+ assert table.location() == location
+ catalog.drop_table(test_table_identifier)
+
+
+def test_create_tables_idempotency(catalog: Catalog) -> None:
+ # Second initialization should not fail even if tables are already created
+ catalog.create_tables() # type: ignore[attr-defined]
+ catalog.create_tables() # type: ignore[attr-defined]
+
+
+def test_create_table_pyarrow_schema(
+ catalog: Catalog,
+ pyarrow_schema_simple_without_ids: pa.Schema,
+ test_table_identifier: Identifier,
+ test_table_properties: dict[str, str],
+) -> None:
+ namespace = Catalog.namespace_from(test_table_identifier)
+ catalog.create_namespace(namespace)
+ table = catalog.create_table(
+ identifier=test_table_identifier,
+ schema=pyarrow_schema_simple_without_ids,
+ properties=test_table_properties,
+ )
+ assert catalog.load_table(test_table_identifier) == table
+
+
+def test_write_pyarrow_schema(catalog: Catalog, test_table_identifier:
Identifier) -> None:
+ pyarrow_table = pa.Table.from_arrays(
+ [
+ pa.array([None, "A", "B", "C"]), # 'foo' column
+ pa.array([1, 2, 3, 4]), # 'bar' column
+ pa.array([True, None, False, True]), # 'baz' column
+ pa.array([None, "A", "B", "C"]), # 'large' column
+ ],
+ schema=pa.schema(
+ [
+ pa.field("foo", pa.large_string(), nullable=True),
+ pa.field("bar", pa.int32(), nullable=False),
+ pa.field("baz", pa.bool_(), nullable=True),
+ pa.field("large", pa.large_string(), nullable=True),
+ ]
+ ),
+ )
+ namespace = Catalog.namespace_from(test_table_identifier)
+ catalog.create_namespace(namespace)
+ table = catalog.create_table(test_table_identifier, pyarrow_table.schema)
+ table.append(pyarrow_table)
+
+
[email protected](
+ "schema,expected",
+ [
+ (lazy_fixture("pyarrow_schema_simple_without_ids"),
lazy_fixture("iceberg_schema_simple_no_ids")),
+ (lazy_fixture("table_schema_simple"),
lazy_fixture("table_schema_simple")),
+ (lazy_fixture("table_schema_nested"),
lazy_fixture("table_schema_nested")),
+ (lazy_fixture("pyarrow_schema_nested_without_ids"),
lazy_fixture("iceberg_schema_nested_no_ids")),
+ ],
+)
+def test_convert_schema_if_needed(
+ schema: Schema | pa.Schema,
+ expected: Schema,
+ catalog: Catalog,
+) -> None:
+ assert expected == catalog._convert_schema_if_needed(schema)
+
+
+# Register table tests
+
+
+def test_register_table(catalog: Catalog, test_table_identifier: Identifier,
metadata_location: str) -> None:
+ identifier_tuple = Catalog.identifier_to_tuple(test_table_identifier)
+ namespace = Catalog.namespace_from(test_table_identifier)
+ catalog.create_namespace(namespace)
+ table = catalog.register_table(test_table_identifier, metadata_location)
+ assert table.name() == identifier_tuple
+ assert table.metadata_location == metadata_location
+ assert os.path.exists(metadata_location)
+ catalog.drop_table(test_table_identifier)
+
+
+def test_register_existing_table(catalog: Catalog, test_table_identifier:
Identifier, metadata_location: str) -> None:
+ namespace = Catalog.namespace_from(test_table_identifier)
+ catalog.create_namespace(namespace)
+ catalog.register_table(test_table_identifier, metadata_location)
+ with pytest.raises(TableAlreadyExistsError):
+ catalog.register_table(test_table_identifier, metadata_location)
+
+
+# Load table tests
+
+
+def test_load_table(catalog: Catalog, table_schema_nested: Schema,
test_table_identifier: Identifier) -> None:
+ namespace = Catalog.namespace_from(test_table_identifier)
+ catalog.create_namespace(namespace)
+ table = catalog.create_table(test_table_identifier, table_schema_nested)
+ loaded_table = catalog.load_table(test_table_identifier)
+ assert table.name() == loaded_table.name()
+ assert table.metadata_location == loaded_table.metadata_location
+ assert table.metadata == loaded_table.metadata
+
+
+def test_load_table_from_self_identifier(
+ catalog: Catalog, table_schema_nested: Schema, test_table_identifier:
Identifier
+) -> None:
+ identifier_tuple = Catalog.identifier_to_tuple(test_table_identifier)
+ namespace = Catalog.namespace_from(test_table_identifier)
+ catalog.create_namespace(namespace)
+ table = catalog.create_table(test_table_identifier, table_schema_nested)
+ intermediate = catalog.load_table(test_table_identifier)
+ assert intermediate.name() == identifier_tuple
+ loaded_table = catalog.load_table(intermediate.name())
+ assert table.name() == loaded_table.name()
+ assert table.metadata_location == loaded_table.metadata_location
+ assert table.metadata == loaded_table.metadata
+
+
+# Rename table tests
+
+
+def test_rename_table(
+ catalog: Catalog, table_schema_nested: Schema, test_table_identifier:
Identifier, another_table_identifier: Identifier
+) -> None:
+ from_namespace = Catalog.namespace_from(test_table_identifier)
+ to_namespace = Catalog.namespace_from(another_table_identifier)
+ catalog.create_namespace(from_namespace)
+ catalog.create_namespace(to_namespace)
+ table = catalog.create_table(test_table_identifier, table_schema_nested)
+ assert table.name() == test_table_identifier
+ catalog.rename_table(test_table_identifier, another_table_identifier)
+ new_table = catalog.load_table(another_table_identifier)
+ assert new_table.name() == another_table_identifier
+ assert new_table.metadata_location == table.metadata_location
+ with pytest.raises(NoSuchTableError):
+ catalog.load_table(test_table_identifier)
+
+
+def test_rename_table_from_self_identifier(
+ catalog: Catalog, table_schema_nested: Schema, test_table_identifier:
Identifier, another_table_identifier: Identifier
+) -> None:
+ from_namespace = Catalog.namespace_from(test_table_identifier)
+ to_namespace = Catalog.namespace_from(another_table_identifier)
+ catalog.create_namespace(from_namespace)
+ catalog.create_namespace(to_namespace)
+ table = catalog.create_table(test_table_identifier, table_schema_nested)
+ assert table.name() == test_table_identifier
+ catalog.rename_table(table.name(), another_table_identifier)
+ new_table = catalog.load_table(another_table_identifier)
+ assert new_table.name() == another_table_identifier
+ assert new_table.metadata_location == table.metadata_location
+ with pytest.raises(NoSuchTableError):
+ catalog.load_table(table.name())
+ with pytest.raises(NoSuchTableError):
+ catalog.load_table(test_table_identifier)
+
+
+def test_rename_table_to_existing_one(
+ catalog: Catalog, table_schema_nested: Schema, test_table_identifier:
Identifier, another_table_identifier: Identifier
+) -> None:
+ from_namespace = Catalog.namespace_from(test_table_identifier)
+ to_namespace = Catalog.namespace_from(another_table_identifier)
+ catalog.create_namespace(from_namespace)
+ catalog.create_namespace(to_namespace)
+ table = catalog.create_table(test_table_identifier, table_schema_nested)
+ assert table.name() == test_table_identifier
+ new_table = catalog.create_table(another_table_identifier,
table_schema_nested)
+ assert new_table.name() == another_table_identifier
+ with pytest.raises(TableAlreadyExistsError):
+ catalog.rename_table(test_table_identifier, another_table_identifier)
+
+
+def test_rename_missing_table(catalog: Catalog, test_table_identifier:
Identifier, another_table_identifier: Identifier) -> None:
+ from_namespace = Catalog.namespace_from(test_table_identifier)
+ to_namespace = Catalog.namespace_from(another_table_identifier)
+ catalog.create_namespace(from_namespace)
+ catalog.create_namespace(to_namespace)
+ with pytest.raises(NoSuchTableError):
+ catalog.rename_table(test_table_identifier, another_table_identifier)
+
+
+def test_rename_table_to_missing_namespace(
+ catalog: Catalog, table_schema_nested: Schema, test_table_identifier:
Identifier, another_table_identifier: Identifier
+) -> None:
+ from_namespace = Catalog.namespace_from(test_table_identifier)
+ catalog.create_namespace(from_namespace)
+ table = catalog.create_table(test_table_identifier, table_schema_nested)
+ assert table.name() == test_table_identifier
+ with pytest.raises(NoSuchNamespaceError):
+ catalog.rename_table(test_table_identifier, another_table_identifier)
+
+
+# Drop table tests
+
+
+def test_drop_table(catalog: Catalog, table_schema_nested: Schema,
test_table_identifier: Identifier) -> None:
+ identifier_tuple = Catalog.identifier_to_tuple(test_table_identifier)
+ namespace = Catalog.namespace_from(test_table_identifier)
+ catalog.create_namespace(namespace)
+ table = catalog.create_table(test_table_identifier, table_schema_nested)
+ assert table.name() == identifier_tuple
+ catalog.drop_table(test_table_identifier)
+ with pytest.raises(NoSuchTableError):
+ catalog.load_table(test_table_identifier)
+
+
+def test_drop_table_from_self_identifier(
+ catalog: Catalog, table_schema_nested: Schema, test_table_identifier:
Identifier
+) -> None:
+ identifier_tuple = Catalog.identifier_to_tuple(test_table_identifier)
+ namespace = Catalog.namespace_from(test_table_identifier)
+ catalog.create_namespace(namespace)
+ table = catalog.create_table(test_table_identifier, table_schema_nested)
+ assert table.name() == identifier_tuple
+ catalog.drop_table(table.name())
+ with pytest.raises(NoSuchTableError):
+ catalog.load_table(table.name())
+ with pytest.raises(NoSuchTableError):
+ catalog.load_table(test_table_identifier)
+
+
+def test_drop_table_that_does_not_exist_raise_error(catalog: Catalog,
test_table_identifier: Identifier) -> None:
+ with pytest.raises(NoSuchTableError):
+ catalog.drop_table(test_table_identifier)
+
+
+def test_purge_table(catalog: Catalog, table_schema_simple: Schema,
test_table_identifier: Identifier) -> None:
+ namespace = Catalog.namespace_from(test_table_identifier)
+ catalog.create_namespace(namespace)
+ catalog.create_table(test_table_identifier, table_schema_simple)
+ catalog.purge_table(test_table_identifier)
+ with pytest.raises(NoSuchTableError, match=f"Table does not exist:
{'.'.join(test_table_identifier)}"):
+ catalog.load_table(test_table_identifier)
+
+
+# List tables tests
+
+
+def test_list_tables(
+ catalog: Catalog, table_schema_nested: Schema, test_table_identifier:
Identifier, another_table_identifier: Identifier
+) -> None:
+ namespace_1 = Catalog.namespace_from(test_table_identifier)
+ namespace_2 = Catalog.namespace_from(another_table_identifier)
+ catalog.create_namespace(namespace_1)
+ catalog.create_namespace(namespace_2)
+ catalog.create_table(test_table_identifier, table_schema_nested)
+ catalog.create_table(another_table_identifier, table_schema_nested)
+ identifier_list = catalog.list_tables(namespace_1)
+ assert len(identifier_list) == 1
+ assert test_table_identifier in identifier_list
+
+ identifier_list = catalog.list_tables(namespace_2)
+ assert len(identifier_list) == 1
+ assert another_table_identifier in identifier_list
+
+
+def test_list_tables_under_a_namespace(catalog: Catalog, table_schema_nested:
Schema, test_table_identifier: Identifier) -> None:
+ namespace = Catalog.namespace_from(test_table_identifier)
+ catalog.create_namespace(namespace)
+ catalog.create_table(test_table_identifier, table_schema_nested)
+ new_namespace = ("new", "namespace")
+ catalog.create_namespace(new_namespace)
+ all_tables = catalog.list_tables(namespace=namespace)
+ new_namespace_tables = catalog.list_tables(new_namespace)
+ assert all_tables
+ assert test_table_identifier in all_tables
+ assert new_namespace_tables == []
+
+
+def test_list_tables_when_missing_namespace(catalog: Catalog, test_namespace:
Identifier) -> None:
+ with pytest.raises(NoSuchNamespaceError):
+ catalog.list_tables(test_namespace)
+
+
+# Commit table tests
+def test_commit_table(catalog: Catalog, table_schema_nested: Schema,
test_table_identifier: Identifier) -> None:
+ namespace = Catalog.namespace_from(test_table_identifier)
+ catalog.create_namespace(namespace)
+ table = catalog.create_table(test_table_identifier, table_schema_nested)
+ last_updated_ms = table.metadata.last_updated_ms
+ original_table_metadata_location = table.metadata_location
+ original_table_last_updated_ms = table.metadata.last_updated_ms
+
+ assert catalog._parse_metadata_version(table.metadata_location) == 0 #
type: ignore[attr-defined]
+ assert table.metadata.current_schema_id == 0
+
+ transaction = table.transaction()
+ update = transaction.update_schema()
+ update.add_column(path="b", field_type=IntegerType())
+ update.commit()
+ transaction.commit_transaction()
+
+ updated_table_metadata = table.metadata
+
+ assert catalog._parse_metadata_version(table.metadata_location) == 1 #
type: ignore[attr-defined]
+ assert updated_table_metadata.current_schema_id == 1
+ assert len(updated_table_metadata.schemas) == 2
+ new_schema = next(schema for schema in updated_table_metadata.schemas if
schema.schema_id == 1)
+ assert new_schema
+ assert new_schema == update._apply()
+ assert new_schema.find_field("b").field_type == IntegerType()
+ assert updated_table_metadata.last_updated_ms > last_updated_ms
+ assert len(updated_table_metadata.metadata_log) == 1
+ assert updated_table_metadata.metadata_log[0].metadata_file ==
original_table_metadata_location
+ assert updated_table_metadata.metadata_log[0].timestamp_ms ==
original_table_last_updated_ms
+
+
+def test_catalog_commit_table_applies_schema_updates(
+ catalog: Catalog,
+ table_schema_nested: Schema,
+ test_table_identifier: Identifier,
+) -> None:
+ namespace = Catalog.namespace_from(test_table_identifier)
+ catalog.create_namespace(namespace)
+ table = catalog.create_table(test_table_identifier, table_schema_nested)
+
+ new_schema = Schema(
+ NestedField(1, "x", LongType()),
+ NestedField(2, "y", LongType(), doc="comment"),
+ NestedField(3, "z", LongType()),
+ NestedField(4, "add", LongType()),
+ )
+
+ response = table.catalog.commit_table(
+ table,
+ updates=(
+ AddSchemaUpdate(schema=new_schema),
+ SetCurrentSchemaUpdate(),
+ ),
+ requirements=(),
+ )
+ assert response.metadata.table_uuid == table.metadata.table_uuid
+ assert len(response.metadata.schemas) == 2
+ assert response.metadata.schemas[1] == new_schema
+ assert response.metadata.current_schema_id == new_schema.schema_id
+
+
+def test_concurrent_commit_table(catalog: Catalog, table_schema_nested:
Schema, test_table_identifier: Identifier) -> None:
+ namespace = Catalog.namespace_from(test_table_identifier)
+ catalog.create_namespace(namespace)
+ table_a = catalog.create_table(test_table_identifier, table_schema_nested)
+ table_b = catalog.load_table(test_table_identifier)
+
+ with table_a.update_schema() as update:
+ update.add_column(path="b", field_type=IntegerType())
+
+ with pytest.raises(CommitFailedException, match="Requirement failed:
current schema id has changed: expected 0, found 1"):
+ # This one should fail since it already has been updated
+ with table_b.update_schema() as update:
+ update.add_column(path="c", field_type=IntegerType())
+
+
+def test_delete_metadata_multiple(catalog: Catalog, table_schema_nested:
Schema, random_table_identifier: Identifier) -> None:
+ namespace = Catalog.namespace_from(random_table_identifier)
+ catalog.create_namespace(namespace)
+ table = catalog.create_table(random_table_identifier, table_schema_nested)
+
+ original_metadata_location = table.metadata_location
+
+ for i in range(5):
+ with table.transaction() as transaction:
+ with transaction.update_schema() as update:
+ update.add_column(path=f"new_column_{i}",
field_type=IntegerType())
+
+ assert len(table.metadata.metadata_log) == 5
+ assert os.path.exists(original_metadata_location[len("file://") :])
+
+ # Set the max versions property to 2, and delete after commit
+ new_property = {
+ TableProperties.METADATA_PREVIOUS_VERSIONS_MAX: "2",
+ TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED: "true",
+ }
+
+ with table.transaction() as transaction:
+ transaction.set_properties(properties=new_property)
+
+ # Verify that only the most recent metadata files are kept
+ assert len(table.metadata.metadata_log) == 2
+ updated_metadata_1, updated_metadata_2 = table.metadata.metadata_log
+
+ # new metadata log was added, so earlier metadata logs are removed.
+ with table.transaction() as transaction:
+ with transaction.update_schema() as update:
+ update.add_column(path="new_column_x", field_type=IntegerType())
+
+ assert len(table.metadata.metadata_log) == 2
+ assert not os.path.exists(original_metadata_location[len("file://") :])
+ assert not os.path.exists(updated_metadata_1.metadata_file[len("file://")
:])
+ assert os.path.exists(updated_metadata_2.metadata_file[len("file://") :])
+
+
+# Table properties tests
+
+
+def test_table_properties_int_value(catalog: Catalog, table_schema_simple:
Schema, test_table_identifier: Identifier) -> None:
+ # table properties can be set to int, but still serialized to string
+ namespace = Catalog.namespace_from(test_table_identifier)
+ catalog.create_namespace(namespace)
+ property_with_int = {"property_name": 42}
+ table = catalog.create_table(test_table_identifier, table_schema_simple,
properties=property_with_int)
+ assert isinstance(table.properties["property_name"], str)
+
+
+def test_table_properties_raise_for_none_value(
+ catalog: Catalog, table_schema_simple: Schema, test_table_identifier:
Identifier
+) -> None:
+ namespace = Catalog.namespace_from(test_table_identifier)
+ catalog.create_namespace(namespace)
+ property_with_none = {"property_name": None}
+ with pytest.raises(ValidationError) as exc_info:
+ _ = catalog.create_table(test_table_identifier, table_schema_simple,
properties=property_with_none)
+ assert "None type is not a supported value in properties: property_name"
in str(exc_info.value)
+
+
+# Append table
+
+
+def test_append_table(catalog: Catalog, table_schema_simple: Schema,
test_table_identifier: Identifier) -> None:
+ namespace = Catalog.namespace_from(test_table_identifier)
+ catalog.create_namespace(namespace)
+ table = catalog.create_table(test_table_identifier, table_schema_simple)
+
+ df = pa.Table.from_pydict(
+ {
+ "foo": ["a"],
+ "bar": [1],
+ "baz": [True],
+ },
+ schema=schema_to_pyarrow(table_schema_simple),
+ )
+
+ table.append(df)
+
+ # new snapshot is written in APPEND mode
+ assert len(table.metadata.snapshots) == 1
+ assert table.metadata.snapshots[0].snapshot_id ==
table.metadata.current_snapshot_id
+ assert table.metadata.snapshots[0].parent_snapshot_id is None
+ assert table.metadata.snapshots[0].sequence_number == 1
+ assert table.metadata.snapshots[0].summary is not None
+ assert table.metadata.snapshots[0].summary.operation == Operation.APPEND
+ assert table.metadata.snapshots[0].summary["added-data-files"] == "1"
+ assert table.metadata.snapshots[0].summary["added-records"] == "1"
+ assert table.metadata.snapshots[0].summary["total-data-files"] == "1"
+ assert table.metadata.snapshots[0].summary["total-records"] == "1"
+ assert len(table.metadata.metadata_log) == 1
+
+ # read back the data
+ assert df == table.scan().to_arrow()
+
+
+# Test writes
+def test_table_writes_metadata_to_custom_location(
+ catalog: Catalog,
+ test_table_identifier: Identifier,
+ table_schema_simple: Schema,
+ warehouse: Path,
+) -> None:
+ namespace = Catalog.namespace_from(test_table_identifier)
+ metadata_path = f"file://{warehouse}/custom/path"
+ catalog.create_namespace(namespace)
+ table = catalog.create_table(
+ identifier=test_table_identifier,
+ schema=table_schema_simple,
+ properties={TableProperties.WRITE_METADATA_PATH: metadata_path},
+ )
+ df = pa.Table.from_pydict(
+ {"foo": ["a"], "bar": [1], "baz": [True]},
+ schema=schema_to_pyarrow(table_schema_simple),
+ )
+ table.append(df)
+ snapshot = table.current_snapshot()
+ assert snapshot is not None
+ manifests = snapshot.manifests(table.io)
+ location_provider = table.location_provider()
+
+ assert
location_provider.new_metadata_location("").startswith(metadata_path)
+ assert manifests[0].manifest_path.startswith(metadata_path)
+ assert table.location() != metadata_path
+ assert table.metadata_location.startswith(metadata_path)
+
+
+def test_table_writes_metadata_to_default_path(
+ catalog: Catalog,
+ test_table_identifier: Identifier,
+ table_schema_simple: Schema,
+ test_table_properties: dict[str, str],
+) -> None:
+ namespace = Catalog.namespace_from(test_table_identifier)
+ catalog.create_namespace(namespace)
+ table = catalog.create_table(
+ identifier=test_table_identifier,
+ schema=table_schema_simple,
+ properties=test_table_properties,
+ )
+ metadata_path = f"{table.location()}/metadata"
+ df = pa.Table.from_pydict(
+ {"foo": ["a"], "bar": [1], "baz": [True]},
+ schema=schema_to_pyarrow(table_schema_simple),
+ )
+ table.append(df)
+ snapshot = table.current_snapshot()
+ assert snapshot is not None
+ manifests = snapshot.manifests(table.io)
+ location_provider = table.location_provider()
+
+ assert
location_provider.new_metadata_location("").startswith(metadata_path)
+ assert manifests[0].manifest_path.startswith(metadata_path)
+ assert table.metadata_location.startswith(metadata_path)
+
+
+def test_table_metadata_writes_reflect_latest_path(
+ catalog: Catalog,
+ test_table_identifier: Identifier,
+ table_schema_simple: Schema,
+ warehouse: Path,
+) -> None:
+ namespace = Catalog.namespace_from(test_table_identifier)
+ catalog.create_namespace(namespace)
+ table = catalog.create_table(
+ identifier=test_table_identifier,
+ schema=table_schema_simple,
+ )
+
+ initial_metadata_path = f"{table.location()}/metadata"
+ assert table.location_provider().new_metadata_location("metadata.json") ==
f"{initial_metadata_path}/metadata.json"
+
+ # update table with new path for metadata
+ new_metadata_path = f"file://{warehouse}/custom/path"
+ table.transaction().set_properties({TableProperties.WRITE_METADATA_PATH:
new_metadata_path}).commit_transaction()
+
+ assert table.location_provider().new_metadata_location("metadata.json") ==
f"{new_metadata_path}/metadata.json"
+
+
[email protected]("format_version", [1, 2])
+def test_write_and_evolve(catalog: Catalog, format_version: int) -> None:
+ identifier =
f"default.arrow_write_data_and_evolve_schema_v{format_version}"
+
+ try:
+ catalog.create_namespace("default")
+ except NamespaceAlreadyExistsError:
+ pass
+
+ try:
+ catalog.drop_table(identifier=identifier)
+ except NoSuchTableError:
+ pass
+
+ pa_table = pa.Table.from_pydict(
+ {
+ "foo": ["a", None, "z"],
+ },
+ schema=pa.schema([pa.field("foo", pa.large_string(), nullable=True)]),
+ )
+
+ tbl = catalog.create_table(identifier=identifier, schema=pa_table.schema,
properties={"format-version": str(format_version)})
+
+ pa_table_with_column = pa.Table.from_pydict(
+ {
+ "foo": ["a", None, "z"],
+ "bar": [19, None, 25],
+ },
+ schema=pa.schema(
+ [
+ pa.field("foo", pa.large_string(), nullable=True),
+ pa.field("bar", pa.int32(), nullable=True),
+ ]
+ ),
+ )
+
+ with tbl.transaction() as txn:
+ with txn.update_schema() as schema_txn:
+ schema_txn.union_by_name(pa_table_with_column.schema)
+
+ txn.append(pa_table_with_column)
+ txn.overwrite(pa_table_with_column)
+ txn.delete("foo = 'a'")
+
+
+# Merge manifests
[email protected]("format_version", [1, 2])
+def test_merge_manifests_local_file_system(catalog: Catalog,
arrow_table_with_null: pa.Table, format_version: int) -> None:
+ # To catch manifest file name collision bug during merge:
+ # https://github.com/apache/iceberg-python/pull/363#discussion_r1660691918
+ catalog.create_namespace_if_not_exists("default")
+ try:
+ catalog.drop_table("default.test_merge_manifest")
+ except NoSuchTableError:
+ pass
+ tbl = catalog.create_table(
+ "default.test_merge_manifest",
+ arrow_table_with_null.schema,
+ properties={
+ "commit.manifest-merge.enabled": "true",
+ "commit.manifest.min-count-to-merge": "2",
+ "format-version": format_version,
+ },
+ )
+
+ for _ in range(5):
+ tbl.append(arrow_table_with_null)
+
+ assert len(tbl.scan().to_arrow()) == 5 * len(arrow_table_with_null)
+ current_snapshot = tbl.current_snapshot()
+ assert current_snapshot
+ manifests = current_snapshot.manifests(tbl.io)
+ assert len(manifests) == 1
+
+
+# Add column to table
+
+
+def test_add_column(catalog: Catalog, table_schema_simple: Schema,
random_table_identifier: Identifier) -> None:
+ namespace = Catalog.namespace_from(random_table_identifier)
+ catalog.create_namespace(namespace)
+ table = catalog.create_table(random_table_identifier, table_schema_simple)
+ table.update_schema().add_column(path="new_column1",
field_type=IntegerType()).commit()
+ assert table.schema() == Schema(
+ NestedField(field_id=1, name="foo", field_type=StringType(),
required=False),
+ NestedField(field_id=2, name="bar", field_type=IntegerType(),
required=True),
+ NestedField(field_id=3, name="baz", field_type=BooleanType(),
required=False),
+ NestedField(field_id=4, name="new_column1", field_type=IntegerType(),
required=False),
+ schema_id=1,
+ identifier_field_ids=[2],
+ )
+ assert table.schema().schema_id == 1
+
+ transaction = table.transaction()
+ transaction.update_schema().add_column(path="new_column2",
field_type=IntegerType(), doc="doc").commit()
+ transaction.commit_transaction()
+
+ assert table.schema() == Schema(
+ NestedField(field_id=1, name="foo", field_type=StringType(),
required=False),
+ NestedField(field_id=2, name="bar", field_type=IntegerType(),
required=True),
+ NestedField(field_id=3, name="baz", field_type=BooleanType(),
required=False),
+ NestedField(field_id=4, name="new_column1", field_type=IntegerType(),
required=False),
+ NestedField(field_id=5, name="new_column2", field_type=IntegerType(),
required=False, doc="doc"),
+ identifier_field_ids=[2],
+ )
+ assert table.schema().schema_id == 2
+
+
+def test_add_column_with_statement(catalog: Catalog, table_schema_simple:
Schema, random_table_identifier: Identifier) -> None:
+ namespace = Catalog.namespace_from(random_table_identifier)
+ catalog.create_namespace(namespace)
+ table = catalog.create_table(random_table_identifier, table_schema_simple)
+
+ with table.update_schema() as tx:
+ tx.add_column(path="new_column1", field_type=IntegerType())
+
+ assert table.schema() == Schema(
+ NestedField(field_id=1, name="foo", field_type=StringType(),
required=False),
+ NestedField(field_id=2, name="bar", field_type=IntegerType(),
required=True),
+ NestedField(field_id=3, name="baz", field_type=BooleanType(),
required=False),
+ NestedField(field_id=4, name="new_column1", field_type=IntegerType(),
required=False),
+ identifier_field_ids=[2],
+ )
+ assert table.schema().schema_id == 1
+
+ with table.transaction() as tx:
+ tx.update_schema().add_column(path="new_column2",
field_type=IntegerType(), doc="doc").commit()
+
+ assert table.schema() == Schema(
+ NestedField(field_id=1, name="foo", field_type=StringType(),
required=False),
+ NestedField(field_id=2, name="bar", field_type=IntegerType(),
required=True),
+ NestedField(field_id=3, name="baz", field_type=BooleanType(),
required=False),
+ NestedField(field_id=4, name="new_column1", field_type=IntegerType(),
required=False),
+ NestedField(field_id=5, name="new_column2", field_type=IntegerType(),
required=False, doc="doc"),
+ identifier_field_ids=[2],
+ )
+ assert table.schema().schema_id == 2
+
+
+# Namespace tests
+
+
+def test_create_namespace(catalog: Catalog, test_namespace: Identifier,
test_table_properties: dict[str, str]) -> None:
+ catalog.create_namespace(test_namespace, test_table_properties)
+ assert catalog._namespace_exists(test_namespace) # type:
ignore[attr-defined]
+ assert (Catalog.identifier_to_tuple(test_namespace)[:1]) in
catalog.list_namespaces()
+ assert test_table_properties ==
catalog.load_namespace_properties(test_namespace)
+
+
+def test_create_namespace_raises_error_on_existing_namespace(
+ catalog: Catalog, test_namespace: Identifier, test_table_properties:
dict[str, str]
+) -> None:
+ catalog.create_namespace(test_namespace, test_table_properties)
+ with pytest.raises(NamespaceAlreadyExistsError):
+ catalog.create_namespace(test_namespace, test_table_properties)
+
+
+def test_create_namespace_if_not_exists(catalog: Catalog, database_name: str)
-> None:
+ catalog.create_namespace(database_name)
+ assert (database_name,) in catalog.list_namespaces()
+ catalog.create_namespace_if_not_exists(database_name)
+ assert (database_name,) in catalog.list_namespaces()
+
+
+def test_create_namespaces_sharing_same_prefix(catalog: Catalog,
test_namespace: Identifier) -> None:
+ child_namespace = test_namespace + ("child",)
+ # Parent first
+ catalog.create_namespace(test_namespace)
+ # Then child
+ catalog.create_namespace(child_namespace)
+
+
+def test_create_namespace_with_comment_and_location(catalog: Catalog,
test_namespace: Identifier) -> None:
+ test_location = "/test/location"
+ test_properties = {
+ "comment": "this is a test description",
+ "location": test_location,
+ }
+ catalog.create_namespace(namespace=test_namespace,
properties=test_properties)
+ loaded_database_list = catalog.list_namespaces()
+ assert Catalog.identifier_to_tuple(test_namespace)[:1] in
loaded_database_list
+ properties = catalog.load_namespace_properties(test_namespace)
+ assert properties["comment"] == "this is a test description"
+ assert properties["location"] == test_location
+
+
[email protected]("ignore")
+def test_create_namespace_with_null_properties(catalog: Catalog,
test_namespace: Identifier) -> None:
+ with pytest.raises(IntegrityError):
+ catalog.create_namespace(namespace=test_namespace, properties={None:
"value"}) # type: ignore
+
+ with pytest.raises(IntegrityError):
+ catalog.create_namespace(namespace=test_namespace, properties={"key":
None})
+
+
[email protected]("empty_namespace", ["", (), (""), ("", ""), " ", ("
")])
+def test_create_namespace_with_empty_identifier(catalog: Catalog,
empty_namespace: Any) -> None:
+ with pytest.raises(NoSuchNamespaceError):
+ catalog.create_namespace(empty_namespace)
+
+
+# Get namespace tests
+
+
+def
test_get_namespace_metadata_raises_error_when_namespace_does_not_exist(catalog:
Catalog, test_namespace: Identifier) -> None:
+ namespace = ".".join(test_namespace)
+ with pytest.raises(NoSuchNamespaceError, match=f"Namespace {namespace}
does not exists"):
+ catalog.load_namespace_properties(test_namespace)
+
+
+def test_namespace_exists(catalog: Catalog) -> None:
+ for ns in [("db1",), ("db1", "ns1"), ("db2", "ns1"), ("db3", "ns1",
"ns2")]:
+ catalog.create_namespace(ns)
+ assert catalog._namespace_exists(ns) # type: ignore[attr-defined]
+
+ # `db2` exists because `db2.ns1` exists
+ assert catalog._namespace_exists("db2") # type: ignore[attr-defined]
+ # `db3.ns1` exists because `db3.ns1.ns2` exists
+ assert catalog._namespace_exists("db3.ns1") # type: ignore[attr-defined]
+ # make sure '_' is escaped in the query
+ assert not catalog._namespace_exists("db_") # type: ignore[attr-defined]
+ # make sure '%' is escaped in the query
+ assert not catalog._namespace_exists("db%") # type: ignore[attr-defined]
+
+
+# Namespace properties
+
+
+def test_load_namespace_properties(catalog: Catalog, test_namespace:
Identifier) -> None:
+ warehouse_location = "/test/location"
+ test_properties = {
+ "comment": "this is a test description",
+ "location": f"{warehouse_location}/{test_namespace}",
+ "test_property1": "1",
+ "test_property2": "2",
+ "test_property3": "3",
+ }
+
+ catalog.create_namespace(test_namespace, test_properties)
+ listed_properties = catalog.load_namespace_properties(test_namespace)
+ for k, v in listed_properties.items():
+ assert k in test_properties
+ assert v == test_properties[k]
+
+
+def test_load_namespace_properties_non_existing_namespace(catalog: Catalog) ->
None:
+ with pytest.raises(NoSuchNamespaceError):
+ catalog.load_namespace_properties("does_not_exist")
+
+
+def test_load_empty_namespace_properties(catalog: Catalog, test_namespace:
Identifier) -> None:
+ catalog.create_namespace(test_namespace)
+ listed_properties = catalog.load_namespace_properties(test_namespace)
+ assert listed_properties == {"exists": "true"}
+
+
+# List namespaces tests
+
+
+def test_list_namespaces(catalog: Catalog) -> None:
+ namespace_list = ["db", "db.ns1", "db.ns1.ns2", "db.ns2", "db2",
"db2.ns1", "db%"]
+ for namespace in namespace_list:
+ if not catalog._namespace_exists(namespace): # type:
ignore[attr-defined]
+ catalog.create_namespace(namespace)
+
+ ns_list = catalog.list_namespaces()
+ for ns in [("db",), ("db%",), ("db2",)]:
+ assert ns in ns_list
+
+ ns_list = catalog.list_namespaces("db")
+ assert sorted(ns_list) == [("db", "ns1"), ("db", "ns2")]
+
+ ns_list = catalog.list_namespaces("db.ns1")
+ assert sorted(ns_list) == [("db", "ns1", "ns2")]
+
+ ns_list = catalog.list_namespaces("db.ns1.ns2")
+ assert len(ns_list) == 0
+
+
+def test_list_namespaces_fuzzy_match(catalog: Catalog) -> None:
+ namespace_list = ["db.ns1", "db.ns1.ns2", "db.ns2", "db.ns1X.ns3",
"db_.ns1.ns2", "db2.ns1.ns2"]
+ for namespace in namespace_list:
+ if not catalog._namespace_exists(namespace): # type:
ignore[attr-defined]
+ catalog.create_namespace(namespace)
+
+ assert catalog.list_namespaces("db.ns1") == [("db", "ns1", "ns2")]
+
+ assert catalog.list_namespaces("db_.ns1") == [("db_", "ns1", "ns2")]
+
+
+def test_list_non_existing_namespaces(catalog: Catalog) -> None:
+ with pytest.raises(NoSuchNamespaceError):
+ catalog.list_namespaces("does_not_exist")
+
+
+# Update namespace properties tests
+
+
+def test_update_namespace_properties(catalog: Catalog, test_namespace:
Identifier) -> None:
+ warehouse_location = "/test/location"
+ test_properties = {
+ "comment": "this is a test description",
+ "location": f"{warehouse_location}/{test_namespace}",
+ "test_property1": "1",
+ "test_property2": "2",
+ "test_property3": "3",
+ }
+ removals = {"test_property1", "test_property2", "test_property3",
"should_not_removed"}
+ updates = {"test_property4": "4", "test_property5": "5", "comment":
"updated test description"}
+ catalog.create_namespace(test_namespace, test_properties)
+ update_report = catalog.update_namespace_properties(test_namespace,
removals, updates)
+ for k in updates.keys():
+ assert k in update_report.updated
+ for k in removals:
+ if k == "should_not_removed":
+ assert k in update_report.missing
+ else:
+ assert k in update_report.removed
+ assert catalog.load_namespace_properties(test_namespace) == {
+ "comment": "updated test description",
+ "test_property4": "4",
+ "test_property5": "5",
+ "location": f"{warehouse_location}/{test_namespace}",
+ }
+
+
+def test_update_namespace_metadata_raises_error_when_namespace_does_not_exist(
+ catalog: Catalog, test_namespace: Identifier, test_table_properties:
dict[str, str]
+) -> None:
+ namespace = ".".join(test_namespace)
+ with pytest.raises(NoSuchNamespaceError, match=f"Namespace {namespace}
does not exists"):
+ catalog.update_namespace_properties(test_namespace,
updates=test_table_properties)
+
+
+def test_update_namespace_metadata(catalog: Catalog, test_namespace:
Identifier, test_table_properties: dict[str, str]) -> None:
+ catalog.create_namespace(test_namespace, test_table_properties)
+ new_metadata = {"key3": "value3", "key4": "value4"}
+ summary = catalog.update_namespace_properties(test_namespace,
updates=new_metadata)
+ assert catalog._namespace_exists(test_namespace) # type:
ignore[attr-defined]
+ assert new_metadata.items() <=
catalog.load_namespace_properties(test_namespace).items()
+ assert summary.removed == []
+ assert sorted(summary.updated) == ["key3", "key4"]
+ assert summary.missing == []
+
+
+def test_update_namespace_metadata_removals(
+ catalog: Catalog, test_namespace: Identifier, test_table_properties:
dict[str, str]
+) -> None:
+ catalog.create_namespace(test_namespace, test_table_properties)
+ new_metadata = {"key3": "value3", "key4": "value4"}
+ remove_metadata = {"key1"}
+ summary = catalog.update_namespace_properties(test_namespace,
remove_metadata, new_metadata)
+ assert catalog._namespace_exists(test_namespace) # type:
ignore[attr-defined]
+ assert new_metadata.items() <=
catalog.load_namespace_properties(test_namespace).items()
+ assert
remove_metadata.isdisjoint(catalog.load_namespace_properties(test_namespace).keys())
+ assert summary.removed == ["key1"]
+ assert sorted(summary.updated) == ["key3", "key4"]
+ assert summary.missing == []
+
+
+# Drop namespace tests
+
+
+def test_drop_namespace(catalog: Catalog, table_schema_nested: Schema,
test_table_identifier: Identifier) -> None:
+ namespace = Catalog.namespace_from(test_table_identifier)
+ catalog.create_namespace(namespace)
+ assert catalog._namespace_exists(namespace) # type: ignore[attr-defined]
+ catalog.create_table(test_table_identifier, table_schema_nested)
+ with pytest.raises(NamespaceNotEmptyError):
+ catalog.drop_namespace(namespace)
+ catalog.drop_table(test_table_identifier)
+ catalog.drop_namespace(namespace)
+ assert not catalog._namespace_exists(namespace) # type:
ignore[attr-defined]
+
+
+def test_drop_namespace_raises_error_when_namespace_does_not_exist(catalog:
Catalog) -> None:
+ with pytest.raises(NoSuchNamespaceError):
+ catalog.drop_namespace("does_not_exist")
+
+
+def test_drop_namespace_raises_error_when_namespace_not_empty(
+ catalog: Catalog, table_schema_nested: Schema, test_table_identifier:
Identifier
+) -> None:
+ namespace = Catalog.namespace_from(test_table_identifier)
+ catalog.create_namespace(namespace)
+ catalog.create_table(test_table_identifier, table_schema_nested)
+ with pytest.raises(NamespaceNotEmptyError, match=f"Namespace
{'.'.join(namespace)} is not empty"):
+ catalog.drop_namespace(namespace)
diff --git a/tests/catalog/test_sql.py b/tests/catalog/test_sql.py
index 22b9883c..f6846195 100644
--- a/tests/catalog/test_sql.py
+++ b/tests/catalog/test_sql.py
@@ -15,22 +15,15 @@
# specific language governing permissions and limitations
# under the License.
-import os
from collections.abc import Generator
from pathlib import Path
-from typing import Any, cast
+from typing import cast
-import pyarrow as pa
import pytest
-from pydantic_core import ValidationError
-from pytest_lazyfixture import lazy_fixture
from sqlalchemy import Engine, create_engine, inspect
-from sqlalchemy.exc import ArgumentError, IntegrityError
+from sqlalchemy.exc import ArgumentError
-from pyiceberg.catalog import (
- Catalog,
- load_catalog,
-)
+from pyiceberg.catalog import load_catalog
from pyiceberg.catalog.sql import (
DEFAULT_ECHO_VALUE,
DEFAULT_POOL_PRE_PING_VALUE,
@@ -39,29 +32,11 @@ from pyiceberg.catalog.sql import (
SqlCatalogBaseTable,
)
from pyiceberg.exceptions import (
- CommitFailedException,
- NamespaceAlreadyExistsError,
- NamespaceNotEmptyError,
- NoSuchNamespaceError,
NoSuchPropertyException,
- NoSuchTableError,
TableAlreadyExistsError,
)
-from pyiceberg.io import FSSPEC_FILE_IO, PY_IO_IMPL
-from pyiceberg.io.pyarrow import _dataframe_to_data_files, schema_to_pyarrow
-from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC
from pyiceberg.schema import Schema
-from pyiceberg.table import TableProperties
-from pyiceberg.table.snapshots import Operation
-from pyiceberg.table.sorting import (
- NullOrder,
- SortDirection,
- SortField,
- SortOrder,
-)
-from pyiceberg.transforms import IdentityTransform
-from pyiceberg.typedef import Identifier
-from pyiceberg.types import IntegerType, NestedField, StringType, strtobool
+from pyiceberg.types import NestedField, StringType, strtobool
CATALOG_TABLES = [c.__tablename__ for c in
SqlCatalogBaseTable.__subclasses__()]
@@ -71,36 +46,6 @@ def catalog_name() -> str:
return "test_sql_catalog"
[email protected](name="random_table_identifier")
-def fixture_random_table_identifier(warehouse: Path, database_name: str,
table_name: str) -> Identifier:
- os.makedirs(f"{warehouse}/{database_name}/{table_name}/metadata/",
exist_ok=True)
- return database_name, table_name
-
-
[email protected](name="another_random_table_identifier")
-def fixture_another_random_table_identifier(warehouse: Path, database_name:
str, table_name: str) -> Identifier:
- database_name = database_name + "_new"
- table_name = table_name + "_new"
- os.makedirs(f"{warehouse}/{database_name}/{table_name}/metadata/",
exist_ok=True)
- return database_name, table_name
-
-
[email protected](name="random_hierarchical_identifier")
-def fixture_random_hierarchical_identifier(warehouse: Path,
hierarchical_namespace_name: str, table_name: str) -> Identifier:
-
os.makedirs(f"{warehouse}/{hierarchical_namespace_name}/{table_name}/metadata/",
exist_ok=True)
- return Catalog.identifier_to_tuple(".".join((hierarchical_namespace_name,
table_name)))
-
-
[email protected](name="another_random_hierarchical_identifier")
-def fixture_another_random_hierarchical_identifier(
- warehouse: Path, hierarchical_namespace_name: str, table_name: str
-) -> Identifier:
- hierarchical_namespace_name = hierarchical_namespace_name + "_new"
- table_name = table_name + "_new"
-
os.makedirs(f"{warehouse}/{hierarchical_namespace_name}/{table_name}/metadata/",
exist_ok=True)
- return Catalog.identifier_to_tuple(".".join((hierarchical_namespace_name,
table_name)))
-
-
@pytest.fixture(scope="module")
def catalog_memory(catalog_name: str, warehouse: Path) ->
Generator[SqlCatalog, None, None]:
props = {
@@ -135,32 +80,6 @@ def alchemy_engine(catalog_uri: str) -> Engine:
return create_engine(catalog_uri)
[email protected](scope="module")
-def catalog_sqlite_without_rowcount(catalog_name: str, warehouse: Path) ->
Generator[SqlCatalog, None, None]:
- props = {
- "uri": f"sqlite:////{warehouse}/sql-catalog",
- "warehouse": f"file://{warehouse}",
- }
- catalog = SqlCatalog(catalog_name, **props)
- catalog.engine.dialect.supports_sane_rowcount = False
- catalog.create_tables()
- yield catalog
- catalog.destroy_tables()
-
-
[email protected](scope="module")
-def catalog_sqlite_fsspec(catalog_name: str, warehouse: Path) ->
Generator[SqlCatalog, None, None]:
- props = {
- "uri": f"sqlite:////{warehouse}/sql-catalog",
- "warehouse": f"file://{warehouse}",
- PY_IO_IMPL: FSSPEC_FILE_IO,
- }
- catalog = SqlCatalog(catalog_name, **props)
- catalog.create_tables()
- yield catalog
- catalog.destroy_tables()
-
-
def test_creation_with_no_uri(catalog_name: str) -> None:
with pytest.raises(NoSuchPropertyException):
SqlCatalog(catalog_name, not_uri="unused")
@@ -291,1426 +210,6 @@ def test_creation_when_all_tables_exists(alchemy_engine:
Engine, catalog_name: s
confirm_all_tables_exist(catalog)
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- ],
-)
-def test_create_tables_idempotency(catalog: SqlCatalog) -> None:
- # Second initialization should not fail even if tables are already created
- catalog.create_tables()
- catalog.create_tables()
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- ],
-)
[email protected](
- "table_identifier",
- [
- lazy_fixture("random_table_identifier"),
- lazy_fixture("random_hierarchical_identifier"),
- ],
-)
-def test_create_table_default_sort_order(catalog: SqlCatalog,
table_schema_nested: Schema, table_identifier: Identifier) -> None:
- namespace = Catalog.namespace_from(table_identifier)
- catalog.create_namespace(namespace)
- table = catalog.create_table(table_identifier, table_schema_nested)
- assert table.sort_order().order_id == 0, "Order ID must match"
- assert table.sort_order().is_unsorted is True, "Order must be unsorted"
- catalog.drop_table(table_identifier)
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- ],
-)
[email protected](
- "table_identifier",
- [
- lazy_fixture("random_table_identifier"),
- lazy_fixture("random_hierarchical_identifier"),
- ],
-)
-def test_create_v1_table(catalog: SqlCatalog, table_schema_nested: Schema,
table_identifier: Identifier) -> None:
- namespace = Catalog.namespace_from(table_identifier)
- catalog.create_namespace(namespace)
- table = catalog.create_table(table_identifier, table_schema_nested,
properties={"format-version": "1"})
- assert table.sort_order().order_id == 0, "Order ID must match"
- assert table.sort_order().is_unsorted is True, "Order must be unsorted"
- assert table.format_version == 1
- assert table.spec() == UNPARTITIONED_PARTITION_SPEC
- catalog.drop_table(table_identifier)
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- ],
-)
[email protected](
- "table_identifier",
- [
- lazy_fixture("random_table_identifier"),
- lazy_fixture("random_hierarchical_identifier"),
- ],
-)
-def test_create_table_with_pyarrow_schema(
- catalog: SqlCatalog,
- pyarrow_schema_simple_without_ids: pa.Schema,
- iceberg_table_schema_simple: Schema,
- table_identifier: Identifier,
-) -> None:
- namespace = Catalog.namespace_from(table_identifier)
- catalog.create_namespace(namespace)
- table = catalog.create_table(table_identifier,
pyarrow_schema_simple_without_ids)
- assert table.schema() == iceberg_table_schema_simple
- catalog.drop_table(table_identifier)
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- ],
-)
[email protected](
- "table_identifier",
- [
- lazy_fixture("random_table_identifier"),
- lazy_fixture("random_hierarchical_identifier"),
- ],
-)
-def test_write_pyarrow_schema(catalog: SqlCatalog, table_identifier:
Identifier) -> None:
- import pyarrow as pa
-
- pyarrow_table = pa.Table.from_arrays(
- [
- pa.array([None, "A", "B", "C"]), # 'foo' column
- pa.array([1, 2, 3, 4]), # 'bar' column
- pa.array([True, None, False, True]), # 'baz' column
- pa.array([None, "A", "B", "C"]), # 'large' column
- ],
- schema=pa.schema(
- [
- pa.field("foo", pa.large_string(), nullable=True),
- pa.field("bar", pa.int32(), nullable=False),
- pa.field("baz", pa.bool_(), nullable=True),
- pa.field("large", pa.large_string(), nullable=True),
- ]
- ),
- )
- namespace = Catalog.namespace_from(table_identifier)
- catalog.create_namespace(namespace)
- table = catalog.create_table(table_identifier, pyarrow_table.schema)
- table.append(pyarrow_table)
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- ],
-)
[email protected](
- "table_identifier",
- [
- lazy_fixture("random_table_identifier"),
- lazy_fixture("random_hierarchical_identifier"),
- ],
-)
-def test_create_table_custom_sort_order(catalog: SqlCatalog,
table_schema_nested: Schema, table_identifier: Identifier) -> None:
- namespace = Catalog.namespace_from(table_identifier)
- catalog.create_namespace(namespace)
- order = SortOrder(SortField(source_id=2, transform=IdentityTransform(),
null_order=NullOrder.NULLS_FIRST))
- table = catalog.create_table(table_identifier, table_schema_nested,
sort_order=order)
- given_sort_order = table.sort_order()
- assert given_sort_order.order_id == 1, "Order ID must match"
- assert len(given_sort_order.fields) == 1, "Order must have 1 field"
- assert given_sort_order.fields[0].direction == SortDirection.ASC,
"Direction must match"
- assert given_sort_order.fields[0].null_order == NullOrder.NULLS_FIRST,
"Null order must match"
- assert isinstance(given_sort_order.fields[0].transform,
IdentityTransform), "Transform must match"
- catalog.drop_table(table_identifier)
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- ],
-)
[email protected](
- "table_identifier",
- [
- lazy_fixture("random_table_identifier"),
- lazy_fixture("random_hierarchical_identifier"),
- ],
-)
-def test_create_table_with_default_warehouse_location(
- warehouse: Path, catalog: SqlCatalog, table_schema_nested: Schema,
table_identifier: Identifier
-) -> None:
- identifier_tuple = Catalog.identifier_to_tuple(table_identifier)
- namespace = Catalog.namespace_from(table_identifier)
- catalog.create_namespace(namespace)
- catalog.create_table(table_identifier, table_schema_nested)
- table = catalog.load_table(table_identifier)
- assert table.name() == identifier_tuple
- assert table.metadata_location.startswith(f"file://{warehouse}")
- assert os.path.exists(table.metadata_location[len("file://") :])
- catalog.drop_table(table_identifier)
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- ],
-)
[email protected](
- "table_identifier",
- [
- lazy_fixture("random_table_identifier"),
- lazy_fixture("random_hierarchical_identifier"),
- ],
-)
-def test_create_table_with_given_location_removes_trailing_slash(
- warehouse: Path, catalog: SqlCatalog, table_schema_nested: Schema,
table_identifier: Identifier
-) -> None:
- identifier_tuple = Catalog.identifier_to_tuple(table_identifier)
- namespace = Catalog.namespace_from(table_identifier)
- table_name = Catalog.table_name_from(identifier_tuple)
- location = f"file://{warehouse}/{catalog.name}/{table_name}-given"
- catalog.create_namespace(namespace)
- catalog.create_table(table_identifier, table_schema_nested,
location=f"{location}/")
- table = catalog.load_table(table_identifier)
- assert table.name() == identifier_tuple
- assert table.metadata_location.startswith(f"file://{warehouse}")
- assert os.path.exists(table.metadata_location[len("file://") :])
- assert table.location() == location
- catalog.drop_table(table_identifier)
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- ],
-)
[email protected](
- "table_identifier",
- [
- lazy_fixture("random_table_identifier"),
- lazy_fixture("random_hierarchical_identifier"),
- ],
-)
-def test_create_duplicated_table(catalog: SqlCatalog, table_schema_nested:
Schema, table_identifier: Identifier) -> None:
- namespace = Catalog.namespace_from(table_identifier)
- catalog.create_namespace(namespace)
- catalog.create_table(table_identifier, table_schema_nested)
- with pytest.raises(TableAlreadyExistsError):
- catalog.create_table(table_identifier, table_schema_nested)
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- ],
-)
[email protected](
- "table_identifier",
- [
- lazy_fixture("random_table_identifier"),
- lazy_fixture("random_hierarchical_identifier"),
- ],
-)
-def test_create_table_if_not_exists_duplicated_table(
- catalog: SqlCatalog, table_schema_nested: Schema, table_identifier:
Identifier
-) -> None:
- namespace = Catalog.namespace_from(table_identifier)
- catalog.create_namespace(namespace)
- table1 = catalog.create_table(table_identifier, table_schema_nested)
- table2 = catalog.create_table_if_not_exists(table_identifier,
table_schema_nested)
- assert table1.name() == table2.name()
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- ],
-)
-def test_create_table_with_non_existing_namespace(catalog: SqlCatalog,
table_schema_nested: Schema, table_name: str) -> None:
- identifier = ("invalid", table_name)
- with pytest.raises(NoSuchNamespaceError):
- catalog.create_table(identifier, table_schema_nested)
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- ],
-)
-def test_create_table_without_namespace(catalog: SqlCatalog,
table_schema_nested: Schema, table_name: str) -> None:
- with pytest.raises(NoSuchNamespaceError):
- catalog.create_table(table_name, table_schema_nested)
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- ],
-)
[email protected](
- "table_identifier",
- [
- lazy_fixture("random_table_identifier"),
- lazy_fixture("random_hierarchical_identifier"),
- ],
-)
-def test_register_table(catalog: SqlCatalog, table_identifier: Identifier,
metadata_location: str) -> None:
- identifier_tuple = Catalog.identifier_to_tuple(table_identifier)
- namespace = Catalog.namespace_from(table_identifier)
- catalog.create_namespace(namespace)
- table = catalog.register_table(table_identifier, metadata_location)
- assert table.name() == identifier_tuple
- assert table.metadata_location == metadata_location
- assert os.path.exists(metadata_location)
- catalog.drop_table(table_identifier)
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- ],
-)
[email protected](
- "table_identifier",
- [
- lazy_fixture("random_table_identifier"),
- lazy_fixture("random_hierarchical_identifier"),
- ],
-)
-def test_register_existing_table(catalog: SqlCatalog, table_identifier:
Identifier, metadata_location: str) -> None:
- namespace = Catalog.namespace_from(table_identifier)
- catalog.create_namespace(namespace)
- catalog.register_table(table_identifier, metadata_location)
- with pytest.raises(TableAlreadyExistsError):
- catalog.register_table(table_identifier, metadata_location)
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- ],
-)
-def test_register_table_with_non_existing_namespace(catalog: SqlCatalog,
metadata_location: str, table_name: str) -> None:
- identifier = ("invalid", table_name)
- with pytest.raises(NoSuchNamespaceError):
- catalog.register_table(identifier, metadata_location)
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- ],
-)
-def test_register_table_without_namespace(catalog: SqlCatalog,
metadata_location: str, table_name: str) -> None:
- with pytest.raises(ValueError):
- catalog.register_table(table_name, metadata_location)
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- ],
-)
[email protected](
- "table_identifier",
- [
- lazy_fixture("random_table_identifier"),
- lazy_fixture("random_hierarchical_identifier"),
- ],
-)
-def test_load_table(catalog: SqlCatalog, table_schema_nested: Schema,
table_identifier: Identifier) -> None:
- namespace = Catalog.namespace_from(table_identifier)
- catalog.create_namespace(namespace)
- table = catalog.create_table(table_identifier, table_schema_nested)
- loaded_table = catalog.load_table(table_identifier)
- assert table.name() == loaded_table.name()
- assert table.metadata_location == loaded_table.metadata_location
- assert table.metadata == loaded_table.metadata
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- ],
-)
[email protected](
- "table_identifier",
- [
- lazy_fixture("random_table_identifier"),
- lazy_fixture("random_hierarchical_identifier"),
- ],
-)
-def test_load_table_from_self_identifier(catalog: SqlCatalog,
table_schema_nested: Schema, table_identifier: Identifier) -> None:
- identifier_tuple = Catalog.identifier_to_tuple(table_identifier)
- namespace = Catalog.namespace_from(table_identifier)
- catalog.create_namespace(namespace)
- table = catalog.create_table(table_identifier, table_schema_nested)
- intermediate = catalog.load_table(table_identifier)
- assert intermediate.name() == identifier_tuple
- loaded_table = catalog.load_table(intermediate.name())
- assert table.name() == loaded_table.name()
- assert table.metadata_location == loaded_table.metadata_location
- assert table.metadata == loaded_table.metadata
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- lazy_fixture("catalog_sqlite_without_rowcount"),
- ],
-)
[email protected](
- "table_identifier",
- [
- lazy_fixture("random_table_identifier"),
- lazy_fixture("random_hierarchical_identifier"),
- ],
-)
-def test_drop_table(catalog: SqlCatalog, table_schema_nested: Schema,
table_identifier: Identifier) -> None:
- identifier_tuple = Catalog.identifier_to_tuple(table_identifier)
- namespace = Catalog.namespace_from(table_identifier)
- catalog.create_namespace(namespace)
- table = catalog.create_table(table_identifier, table_schema_nested)
- assert table.name() == identifier_tuple
- catalog.drop_table(table_identifier)
- with pytest.raises(NoSuchTableError):
- catalog.load_table(table_identifier)
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- lazy_fixture("catalog_sqlite_without_rowcount"),
- ],
-)
[email protected](
- "table_identifier",
- [
- lazy_fixture("random_table_identifier"),
- lazy_fixture("random_hierarchical_identifier"),
- ],
-)
-def test_drop_table_from_self_identifier(catalog: SqlCatalog,
table_schema_nested: Schema, table_identifier: Identifier) -> None:
- identifier_tuple = Catalog.identifier_to_tuple(table_identifier)
- namespace = Catalog.namespace_from(table_identifier)
- catalog.create_namespace(namespace)
- table = catalog.create_table(table_identifier, table_schema_nested)
- assert table.name() == identifier_tuple
- catalog.drop_table(table.name())
- with pytest.raises(NoSuchTableError):
- catalog.load_table(table.name())
- with pytest.raises(NoSuchTableError):
- catalog.load_table(table_identifier)
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- lazy_fixture("catalog_sqlite_without_rowcount"),
- ],
-)
[email protected](
- "table_identifier",
- [
- lazy_fixture("random_table_identifier"),
- lazy_fixture("random_hierarchical_identifier"),
- ],
-)
-def test_drop_table_that_does_not_exist(catalog: SqlCatalog, table_identifier:
Identifier) -> None:
- with pytest.raises(NoSuchTableError):
- catalog.drop_table(table_identifier)
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- lazy_fixture("catalog_sqlite_without_rowcount"),
- ],
-)
[email protected](
- "from_table_identifier",
- [
- lazy_fixture("random_table_identifier"),
- lazy_fixture("random_hierarchical_identifier"),
- ],
-)
[email protected](
- "to_table_identifier",
- [
- lazy_fixture("another_random_table_identifier"),
- lazy_fixture("another_random_hierarchical_identifier"),
- ],
-)
-def test_rename_table(
- catalog: SqlCatalog, table_schema_nested: Schema, from_table_identifier:
Identifier, to_table_identifier: Identifier
-) -> None:
- from_namespace = Catalog.namespace_from(from_table_identifier)
- to_namespace = Catalog.namespace_from(to_table_identifier)
- catalog.create_namespace(from_namespace)
- catalog.create_namespace(to_namespace)
- table = catalog.create_table(from_table_identifier, table_schema_nested)
- assert table.name() == from_table_identifier
- catalog.rename_table(from_table_identifier, to_table_identifier)
- new_table = catalog.load_table(to_table_identifier)
- assert new_table.name() == to_table_identifier
- assert new_table.metadata_location == table.metadata_location
- with pytest.raises(NoSuchTableError):
- catalog.load_table(from_table_identifier)
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- lazy_fixture("catalog_sqlite_without_rowcount"),
- ],
-)
[email protected](
- "from_table_identifier",
- [
- lazy_fixture("random_table_identifier"),
- lazy_fixture("random_hierarchical_identifier"),
- ],
-)
[email protected](
- "to_table_identifier",
- [
- lazy_fixture("another_random_table_identifier"),
- lazy_fixture("another_random_hierarchical_identifier"),
- ],
-)
-def test_rename_table_from_self_identifier(
- catalog: SqlCatalog, table_schema_nested: Schema, from_table_identifier:
Identifier, to_table_identifier: Identifier
-) -> None:
- from_namespace = Catalog.namespace_from(from_table_identifier)
- to_namespace = Catalog.namespace_from(to_table_identifier)
- catalog.create_namespace(from_namespace)
- catalog.create_namespace(to_namespace)
- table = catalog.create_table(from_table_identifier, table_schema_nested)
- assert table.name() == from_table_identifier
- catalog.rename_table(table.name(), to_table_identifier)
- new_table = catalog.load_table(to_table_identifier)
- assert new_table.name() == to_table_identifier
- assert new_table.metadata_location == table.metadata_location
- with pytest.raises(NoSuchTableError):
- catalog.load_table(table.name())
- with pytest.raises(NoSuchTableError):
- catalog.load_table(from_table_identifier)
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- lazy_fixture("catalog_sqlite_without_rowcount"),
- ],
-)
[email protected](
- "from_table_identifier",
- [
- lazy_fixture("random_table_identifier"),
- lazy_fixture("random_hierarchical_identifier"),
- ],
-)
[email protected](
- "to_table_identifier",
- [
- lazy_fixture("another_random_table_identifier"),
- lazy_fixture("another_random_hierarchical_identifier"),
- ],
-)
-def test_rename_table_to_existing_one(
- catalog: SqlCatalog, table_schema_nested: Schema, from_table_identifier:
Identifier, to_table_identifier: Identifier
-) -> None:
- from_namespace = Catalog.namespace_from(from_table_identifier)
- to_namespace = Catalog.namespace_from(to_table_identifier)
- catalog.create_namespace(from_namespace)
- catalog.create_namespace(to_namespace)
- table = catalog.create_table(from_table_identifier, table_schema_nested)
- assert table.name() == from_table_identifier
- new_table = catalog.create_table(to_table_identifier, table_schema_nested)
- assert new_table.name() == to_table_identifier
- with pytest.raises(TableAlreadyExistsError):
- catalog.rename_table(from_table_identifier, to_table_identifier)
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- lazy_fixture("catalog_sqlite_without_rowcount"),
- ],
-)
[email protected](
- "from_table_identifier",
- [
- lazy_fixture("random_table_identifier"),
- lazy_fixture("random_hierarchical_identifier"),
- ],
-)
[email protected](
- "to_table_identifier",
- [
- lazy_fixture("another_random_table_identifier"),
- lazy_fixture("another_random_hierarchical_identifier"),
- ],
-)
-def test_rename_missing_table(catalog: SqlCatalog, from_table_identifier:
Identifier, to_table_identifier: Identifier) -> None:
- to_namespace = Catalog.namespace_from(to_table_identifier)
- catalog.create_namespace(to_namespace)
- with pytest.raises(NoSuchTableError):
- catalog.rename_table(from_table_identifier, to_table_identifier)
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- lazy_fixture("catalog_sqlite_without_rowcount"),
- ],
-)
[email protected](
- "from_table_identifier",
- [
- lazy_fixture("random_table_identifier"),
- lazy_fixture("random_hierarchical_identifier"),
- ],
-)
[email protected](
- "to_table_identifier",
- [
- lazy_fixture("another_random_table_identifier"),
- lazy_fixture("another_random_hierarchical_identifier"),
- ],
-)
-def test_rename_table_to_missing_namespace(
- catalog: SqlCatalog, table_schema_nested: Schema, from_table_identifier:
Identifier, to_table_identifier: Identifier
-) -> None:
- from_namespace = Catalog.namespace_from(from_table_identifier)
- catalog.create_namespace(from_namespace)
- table = catalog.create_table(from_table_identifier, table_schema_nested)
- assert table.name() == from_table_identifier
- with pytest.raises(NoSuchNamespaceError):
- catalog.rename_table(from_table_identifier, to_table_identifier)
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- ],
-)
[email protected](
- "table_identifier_1",
- [
- lazy_fixture("random_table_identifier"),
- lazy_fixture("random_hierarchical_identifier"),
- ],
-)
[email protected](
- "table_identifier_2",
- [
- lazy_fixture("another_random_table_identifier"),
- lazy_fixture("another_random_hierarchical_identifier"),
- ],
-)
-def test_list_tables(
- catalog: SqlCatalog, table_schema_nested: Schema, table_identifier_1:
Identifier, table_identifier_2: Identifier
-) -> None:
- namespace_1 = Catalog.namespace_from(table_identifier_1)
- namespace_2 = Catalog.namespace_from(table_identifier_2)
- catalog.create_namespace(namespace_1)
- catalog.create_namespace(namespace_2)
- catalog.create_table(table_identifier_1, table_schema_nested)
- catalog.create_table(table_identifier_2, table_schema_nested)
- identifier_list = catalog.list_tables(namespace_1)
- assert len(identifier_list) == 1
- assert table_identifier_1 in identifier_list
-
- identifier_list = catalog.list_tables(namespace_2)
- assert len(identifier_list) == 1
- assert table_identifier_2 in identifier_list
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- ],
-)
[email protected]("namespace", [lazy_fixture("database_name"),
lazy_fixture("hierarchical_namespace_name")])
-def test_list_tables_when_missing_namespace(catalog: SqlCatalog, namespace:
str) -> None:
- with pytest.raises(NoSuchNamespaceError):
- catalog.list_tables(namespace)
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- ],
-)
-def test_create_namespace_if_not_exists(catalog: SqlCatalog, database_name:
str) -> None:
- catalog.create_namespace(database_name)
- assert (database_name,) in catalog.list_namespaces()
- catalog.create_namespace_if_not_exists(database_name)
- assert (database_name,) in catalog.list_namespaces()
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- ],
-)
[email protected]("namespace", [lazy_fixture("database_name"),
lazy_fixture("hierarchical_namespace_name")])
-def test_create_namespace(catalog: SqlCatalog, namespace: str) -> None:
- catalog.create_namespace(namespace)
- assert (Catalog.identifier_to_tuple(namespace)[:1]) in
catalog.list_namespaces()
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- ],
-)
[email protected]("namespace", [lazy_fixture("database_name"),
lazy_fixture("hierarchical_namespace_name")])
-def test_create_duplicate_namespace(catalog: SqlCatalog, namespace: str) ->
None:
- catalog.create_namespace(namespace)
- with pytest.raises(NamespaceAlreadyExistsError):
- catalog.create_namespace(namespace)
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- ],
-)
[email protected]("namespace", [lazy_fixture("database_name"),
lazy_fixture("hierarchical_namespace_name")])
-def test_create_namespaces_sharing_same_prefix(catalog: SqlCatalog, namespace:
str) -> None:
- catalog.create_namespace(namespace + "_1")
- # Second namespace is a prefix of the first one, make sure it can be added.
- catalog.create_namespace(namespace)
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- ],
-)
[email protected]("namespace", [lazy_fixture("database_name"),
lazy_fixture("hierarchical_namespace_name")])
-def test_create_namespace_with_comment_and_location(catalog: SqlCatalog,
namespace: str) -> None:
- test_location = "/test/location"
- test_properties = {
- "comment": "this is a test description",
- "location": test_location,
- }
- catalog.create_namespace(namespace=namespace, properties=test_properties)
- loaded_database_list = catalog.list_namespaces()
- assert Catalog.identifier_to_tuple(namespace)[:1] in loaded_database_list
- properties = catalog.load_namespace_properties(namespace)
- assert properties["comment"] == "this is a test description"
- assert properties["location"] == test_location
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- ],
-)
[email protected]("namespace", [lazy_fixture("database_name"),
lazy_fixture("hierarchical_namespace_name")])
[email protected]("ignore")
-def test_create_namespace_with_null_properties(catalog: SqlCatalog, namespace:
str) -> None:
- with pytest.raises(IntegrityError):
- catalog.create_namespace(namespace=namespace, properties={None:
"value"}) # type: ignore
-
- with pytest.raises(IntegrityError):
- catalog.create_namespace(namespace=namespace, properties={"key": None})
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- ],
-)
[email protected]("empty_namespace", ["", (), (""), ("", ""), " ", ("
")])
-def test_create_namespace_with_empty_identifier(catalog: SqlCatalog,
empty_namespace: Any) -> None:
- with pytest.raises(NoSuchNamespaceError):
- catalog.create_namespace(empty_namespace)
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- ],
-)
-def test_namespace_exists(catalog: SqlCatalog) -> None:
- for ns in [("db1",), ("db1", "ns1"), ("db2", "ns1"), ("db3", "ns1",
"ns2")]:
- catalog.create_namespace(ns)
- assert catalog._namespace_exists(ns)
-
- assert catalog._namespace_exists("db2") # `db2` exists because `db2.ns1`
exists
- assert catalog._namespace_exists("db3.ns1") # `db3.ns1` exists because
`db3.ns1.ns2` exists
- assert not catalog._namespace_exists("db_") # make sure '_' is escaped in
the query
- assert not catalog._namespace_exists("db%") # make sure '%' is escaped in
the query
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- ],
-)
-def test_list_namespaces(catalog: SqlCatalog) -> None:
- namespace_list = ["db", "db.ns1", "db.ns1.ns2", "db.ns2", "db2",
"db2.ns1", "db%"]
- for namespace in namespace_list:
- if not catalog._namespace_exists(namespace):
- catalog.create_namespace(namespace)
-
- ns_list = catalog.list_namespaces()
- for ns in [("db",), ("db%",), ("db2",)]:
- assert ns in ns_list
-
- ns_list = catalog.list_namespaces("db")
- assert sorted(ns_list) == [("db", "ns1"), ("db", "ns2")]
-
- ns_list = catalog.list_namespaces("db.ns1")
- assert sorted(ns_list) == [("db", "ns1", "ns2")]
-
- ns_list = catalog.list_namespaces("db.ns1.ns2")
- assert len(ns_list) == 0
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- ],
-)
-def test_list_namespaces_fuzzy_match(catalog: SqlCatalog) -> None:
- namespace_list = ["db.ns1", "db.ns1.ns2", "db.ns2", "db.ns1X.ns3",
"db_.ns1.ns2", "db2.ns1.ns2"]
- for namespace in namespace_list:
- if not catalog._namespace_exists(namespace):
- catalog.create_namespace(namespace)
-
- assert catalog.list_namespaces("db.ns1") == [("db", "ns1", "ns2")]
-
- assert catalog.list_namespaces("db_.ns1") == [("db_", "ns1", "ns2")]
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- ],
-)
-def test_list_non_existing_namespaces(catalog: SqlCatalog) -> None:
- with pytest.raises(NoSuchNamespaceError):
- catalog.list_namespaces("does_not_exist")
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- ],
-)
[email protected](
- "table_identifier",
- [
- lazy_fixture("random_table_identifier"),
- lazy_fixture("random_hierarchical_identifier"),
- ],
-)
-def test_drop_namespace(catalog: SqlCatalog, table_schema_nested: Schema,
table_identifier: Identifier) -> None:
- namespace = Catalog.namespace_from(table_identifier)
- catalog.create_namespace(namespace)
- assert catalog._namespace_exists(namespace)
- catalog.create_table(table_identifier, table_schema_nested)
- with pytest.raises(NamespaceNotEmptyError):
- catalog.drop_namespace(namespace)
- catalog.drop_table(table_identifier)
- catalog.drop_namespace(namespace)
- assert not catalog._namespace_exists(namespace)
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- ],
-)
-def test_drop_non_existing_namespaces(catalog: SqlCatalog) -> None:
- with pytest.raises(NoSuchNamespaceError):
- catalog.drop_namespace("does_not_exist")
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- ],
-)
[email protected]("namespace", [lazy_fixture("database_name"),
lazy_fixture("hierarchical_namespace_name")])
-def test_load_namespace_properties(catalog: SqlCatalog, namespace: str) ->
None:
- warehouse_location = "/test/location"
- test_properties = {
- "comment": "this is a test description",
- "location": f"{warehouse_location}/{namespace}",
- "test_property1": "1",
- "test_property2": "2",
- "test_property3": "3",
- }
-
- catalog.create_namespace(namespace, test_properties)
- listed_properties = catalog.load_namespace_properties(namespace)
- for k, v in listed_properties.items():
- assert k in test_properties
- assert v == test_properties[k]
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- ],
-)
[email protected]("namespace", [lazy_fixture("database_name"),
lazy_fixture("hierarchical_namespace_name")])
-def test_load_empty_namespace_properties(catalog: SqlCatalog, namespace: str)
-> None:
- catalog.create_namespace(namespace)
- listed_properties = catalog.load_namespace_properties(namespace)
- assert listed_properties == {"exists": "true"}
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- ],
-)
-def test_load_namespace_properties_non_existing_namespace(catalog: SqlCatalog)
-> None:
- with pytest.raises(NoSuchNamespaceError):
- catalog.load_namespace_properties("does_not_exist")
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- ],
-)
[email protected]("namespace", [lazy_fixture("database_name"),
lazy_fixture("hierarchical_namespace_name")])
-def test_update_namespace_properties(catalog: SqlCatalog, namespace: str) ->
None:
- warehouse_location = "/test/location"
- test_properties = {
- "comment": "this is a test description",
- "location": f"{warehouse_location}/{namespace}",
- "test_property1": "1",
- "test_property2": "2",
- "test_property3": "3",
- }
- removals = {"test_property1", "test_property2", "test_property3",
"should_not_removed"}
- updates = {"test_property4": "4", "test_property5": "5", "comment":
"updated test description"}
- catalog.create_namespace(namespace, test_properties)
- update_report = catalog.update_namespace_properties(namespace, removals,
updates)
- for k in updates.keys():
- assert k in update_report.updated
- for k in removals:
- if k == "should_not_removed":
- assert k in update_report.missing
- else:
- assert k in update_report.removed
- assert catalog.load_namespace_properties(namespace) == {
- "comment": "updated test description",
- "test_property4": "4",
- "test_property5": "5",
- "location": f"{warehouse_location}/{namespace}",
- }
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- lazy_fixture("catalog_sqlite_without_rowcount"),
- ],
-)
[email protected](
- "table_identifier",
- [
- lazy_fixture("random_table_identifier"),
- lazy_fixture("random_hierarchical_identifier"),
- ],
-)
-def test_commit_table(catalog: SqlCatalog, table_schema_nested: Schema,
table_identifier: Identifier) -> None:
- namespace = Catalog.namespace_from(table_identifier)
- catalog.create_namespace(namespace)
- table = catalog.create_table(table_identifier, table_schema_nested)
- last_updated_ms = table.metadata.last_updated_ms
- original_table_metadata_location = table.metadata_location
- original_table_last_updated_ms = table.metadata.last_updated_ms
-
- assert catalog._parse_metadata_version(table.metadata_location) == 0
- assert table.metadata.current_schema_id == 0
-
- transaction = table.transaction()
- update = transaction.update_schema()
- update.add_column(path="b", field_type=IntegerType())
- update.commit()
- transaction.commit_transaction()
-
- updated_table_metadata = table.metadata
-
- assert catalog._parse_metadata_version(table.metadata_location) == 1
- assert updated_table_metadata.current_schema_id == 1
- assert len(updated_table_metadata.schemas) == 2
- new_schema = next(schema for schema in updated_table_metadata.schemas if
schema.schema_id == 1)
- assert new_schema
- assert new_schema == update._apply()
- assert new_schema.find_field("b").field_type == IntegerType()
- assert updated_table_metadata.last_updated_ms > last_updated_ms
- assert len(updated_table_metadata.metadata_log) == 1
- assert updated_table_metadata.metadata_log[0].metadata_file ==
original_table_metadata_location
- assert updated_table_metadata.metadata_log[0].timestamp_ms ==
original_table_last_updated_ms
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- lazy_fixture("catalog_sqlite_without_rowcount"),
- lazy_fixture("catalog_sqlite_fsspec"),
- ],
-)
[email protected](
- "table_identifier",
- [
- lazy_fixture("random_table_identifier"),
- lazy_fixture("random_hierarchical_identifier"),
- ],
-)
-def test_append_table(catalog: SqlCatalog, table_schema_simple: Schema,
table_identifier: Identifier) -> None:
- namespace = Catalog.namespace_from(table_identifier)
- catalog.create_namespace(namespace)
- table = catalog.create_table(table_identifier, table_schema_simple)
-
- df = pa.Table.from_pydict(
- {
- "foo": ["a"],
- "bar": [1],
- "baz": [True],
- },
- schema=schema_to_pyarrow(table_schema_simple),
- )
-
- table.append(df)
-
- # new snapshot is written in APPEND mode
- assert len(table.metadata.snapshots) == 1
- assert table.metadata.snapshots[0].snapshot_id ==
table.metadata.current_snapshot_id
- assert table.metadata.snapshots[0].parent_snapshot_id is None
- assert table.metadata.snapshots[0].sequence_number == 1
- assert table.metadata.snapshots[0].summary is not None
- assert table.metadata.snapshots[0].summary.operation == Operation.APPEND
- assert table.metadata.snapshots[0].summary["added-data-files"] == "1"
- assert table.metadata.snapshots[0].summary["added-records"] == "1"
- assert table.metadata.snapshots[0].summary["total-data-files"] == "1"
- assert table.metadata.snapshots[0].summary["total-records"] == "1"
- assert len(table.metadata.metadata_log) == 1
-
- # read back the data
- assert df == table.scan().to_arrow()
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- lazy_fixture("catalog_sqlite_without_rowcount"),
- ],
-)
[email protected](
- "table_identifier",
- [
- lazy_fixture("random_table_identifier"),
- lazy_fixture("random_hierarchical_identifier"),
- ],
-)
-def test_concurrent_commit_table(catalog: SqlCatalog, table_schema_simple:
Schema, table_identifier: Identifier) -> None:
- namespace = Catalog.namespace_from(table_identifier)
- catalog.create_namespace(namespace)
- table_a = catalog.create_table(table_identifier, table_schema_simple)
- table_b = catalog.load_table(table_identifier)
-
- with table_a.update_schema() as update:
- update.add_column(path="b", field_type=IntegerType())
-
- with pytest.raises(CommitFailedException, match="Requirement failed:
current schema id has changed: expected 0, found 1"):
- # This one should fail since it already has been updated
- with table_b.update_schema() as update:
- update.add_column(path="c", field_type=IntegerType())
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- lazy_fixture("catalog_sqlite_without_rowcount"),
- ],
-)
[email protected]("format_version", [1, 2])
-def test_write_and_evolve(catalog: SqlCatalog, format_version: int) -> None:
- identifier =
f"default.arrow_write_data_and_evolve_schema_v{format_version}"
-
- try:
- catalog.create_namespace("default")
- except NamespaceAlreadyExistsError:
- pass
-
- try:
- catalog.drop_table(identifier=identifier)
- except NoSuchTableError:
- pass
-
- pa_table = pa.Table.from_pydict(
- {
- "foo": ["a", None, "z"],
- },
- schema=pa.schema([pa.field("foo", pa.large_string(), nullable=True)]),
- )
-
- tbl = catalog.create_table(identifier=identifier, schema=pa_table.schema,
properties={"format-version": str(format_version)})
-
- pa_table_with_column = pa.Table.from_pydict(
- {
- "foo": ["a", None, "z"],
- "bar": [19, None, 25],
- },
- schema=pa.schema(
- [
- pa.field("foo", pa.large_string(), nullable=True),
- pa.field("bar", pa.int32(), nullable=True),
- ]
- ),
- )
-
- with tbl.transaction() as txn:
- with txn.update_schema() as schema_txn:
- schema_txn.union_by_name(pa_table_with_column.schema)
-
- txn.append(pa_table_with_column)
- txn.overwrite(pa_table_with_column)
- txn.delete("foo = 'a'")
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- lazy_fixture("catalog_sqlite_without_rowcount"),
- ],
-)
[email protected]("format_version", [1, 2])
-def test_create_table_transaction(catalog: SqlCatalog, format_version: int) ->
None:
- identifier =
f"default.arrow_create_table_transaction_{catalog.name}_{format_version}"
- try:
- catalog.create_namespace("default")
- except NamespaceAlreadyExistsError:
- pass
-
- try:
- catalog.drop_table(identifier=identifier)
- except NoSuchTableError:
- pass
-
- pa_table = pa.Table.from_pydict(
- {
- "foo": ["a", None, "z"],
- },
- schema=pa.schema([pa.field("foo", pa.large_string(), nullable=True)]),
- )
-
- pa_table_with_column = pa.Table.from_pydict(
- {
- "foo": ["a", None, "z"],
- "bar": [19, None, 25],
- },
- schema=pa.schema(
- [
- pa.field("foo", pa.large_string(), nullable=True),
- pa.field("bar", pa.int32(), nullable=True),
- ]
- ),
- )
-
- with catalog.create_table_transaction(
- identifier=identifier, schema=pa_table.schema,
properties={"format-version": str(format_version)}
- ) as txn:
- with txn.update_snapshot().fast_append() as snapshot_update:
- for data_file in
_dataframe_to_data_files(table_metadata=txn.table_metadata, df=pa_table,
io=txn._table.io):
- snapshot_update.append_data_file(data_file)
-
- with txn.update_schema() as schema_txn:
- schema_txn.union_by_name(pa_table_with_column.schema)
-
- with txn.update_snapshot().fast_append() as snapshot_update:
- for data_file in _dataframe_to_data_files(
- table_metadata=txn.table_metadata, df=pa_table_with_column,
io=txn._table.io
- ):
- snapshot_update.append_data_file(data_file)
-
- tbl = catalog.load_table(identifier=identifier)
- assert tbl.format_version == format_version
- assert len(tbl.scan().to_arrow()) == 6
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- lazy_fixture("catalog_sqlite_without_rowcount"),
- ],
-)
[email protected](
- "table_identifier",
- [
- lazy_fixture("random_table_identifier"),
- lazy_fixture("random_hierarchical_identifier"),
- ],
-)
-def test_table_properties_int_value(catalog: SqlCatalog, table_schema_simple:
Schema, table_identifier: Identifier) -> None:
- # table properties can be set to int, but still serialized to string
- namespace = Catalog.namespace_from(table_identifier)
- catalog.create_namespace(namespace)
- property_with_int = {"property_name": 42}
- table = catalog.create_table(table_identifier, table_schema_simple,
properties=property_with_int)
- assert isinstance(table.properties["property_name"], str)
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- lazy_fixture("catalog_sqlite_without_rowcount"),
- ],
-)
[email protected](
- "table_identifier",
- [
- lazy_fixture("random_table_identifier"),
- lazy_fixture("random_hierarchical_identifier"),
- ],
-)
-def test_table_properties_raise_for_none_value(
- catalog: SqlCatalog, table_schema_simple: Schema, table_identifier:
Identifier
-) -> None:
- namespace = Catalog.namespace_from(table_identifier)
- catalog.create_namespace(namespace)
- property_with_none = {"property_name": None}
- with pytest.raises(ValidationError) as exc_info:
- _ = catalog.create_table(table_identifier, table_schema_simple,
properties=property_with_none)
- assert "None type is not a supported value in properties: property_name"
in str(exc_info.value)
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- ],
-)
[email protected](
- "table_identifier",
- [
- lazy_fixture("random_table_identifier"),
- lazy_fixture("random_hierarchical_identifier"),
- ],
-)
-def test_table_exists(catalog: SqlCatalog, table_schema_simple: Schema,
table_identifier: Identifier) -> None:
- namespace = Catalog.namespace_from(table_identifier)
- catalog.create_namespace(namespace)
- catalog.create_table(table_identifier, table_schema_simple,
properties={"format-version": "2"})
- existing_table = table_identifier
- # Act and Assert for an existing table
- assert catalog.table_exists(existing_table) is True
-
- # Act and Assert for a non-existing table
- assert catalog.table_exists(("non", "exist")) is False
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- ],
-)
[email protected]("format_version", [1, 2])
-def test_merge_manifests_local_file_system(catalog: SqlCatalog,
arrow_table_with_null: pa.Table, format_version: int) -> None:
- # To catch manifest file name collision bug during merge:
- # https://github.com/apache/iceberg-python/pull/363#discussion_r1660691918
- catalog.create_namespace_if_not_exists("default")
- try:
- catalog.drop_table("default.test_merge_manifest")
- except NoSuchTableError:
- pass
- tbl = catalog.create_table(
- "default.test_merge_manifest",
- arrow_table_with_null.schema,
- properties={
- "commit.manifest-merge.enabled": "true",
- "commit.manifest.min-count-to-merge": "2",
- "format-version": format_version,
- },
- )
-
- for _ in range(5):
- tbl.append(arrow_table_with_null)
-
- assert len(tbl.scan().to_arrow()) == 5 * len(arrow_table_with_null)
- current_snapshot = tbl.current_snapshot()
- assert current_snapshot
- manifests = current_snapshot.manifests(tbl.io)
- assert len(manifests) == 1
-
-
[email protected](
- "catalog",
- [
- lazy_fixture("catalog_memory"),
- lazy_fixture("catalog_sqlite"),
- lazy_fixture("catalog_sqlite_without_rowcount"),
- ],
-)
-def test_delete_metadata_multiple(catalog: SqlCatalog, table_schema_nested:
Schema, random_table_identifier: str) -> None:
- namespace = Catalog.namespace_from(random_table_identifier)
- catalog.create_namespace(namespace)
- table = catalog.create_table(random_table_identifier, table_schema_nested)
-
- original_metadata_location = table.metadata_location
-
- for i in range(5):
- with table.transaction() as transaction:
- with transaction.update_schema() as update:
- update.add_column(path=f"new_column_{i}",
field_type=IntegerType())
-
- assert len(table.metadata.metadata_log) == 5
- assert os.path.exists(original_metadata_location[len("file://") :])
-
- # Set the max versions property to 2, and delete after commit
- new_property = {
- TableProperties.METADATA_PREVIOUS_VERSIONS_MAX: "2",
- TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED: "true",
- }
-
- with table.transaction() as transaction:
- transaction.set_properties(properties=new_property)
-
- # Verify that only the most recent metadata files are kept
- assert len(table.metadata.metadata_log) == 2
- updated_metadata_1, updated_metadata_2 = table.metadata.metadata_log
-
- # new metadata log was added, so earlier metadata logs are removed.
- with table.transaction() as transaction:
- with transaction.update_schema() as update:
- update.add_column(path="new_column_x", field_type=IntegerType())
-
- assert len(table.metadata.metadata_log) == 2
- assert not os.path.exists(original_metadata_location[len("file://") :])
- assert not os.path.exists(updated_metadata_1.metadata_file[len("file://")
:])
- assert os.path.exists(updated_metadata_2.metadata_file[len("file://") :])
-
-
class TestSqlCatalogClose:
"""Test SqlCatalog close functionality."""
diff --git a/tests/conftest.py b/tests/conftest.py
index 9dff4258..9d80f489 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -45,9 +45,12 @@ import boto3
import pytest
from moto import mock_aws
from pydantic_core import to_json
+from pytest_lazyfixture import lazy_fixture
from pyiceberg.catalog import Catalog, load_catalog
+from pyiceberg.catalog.memory import InMemoryCatalog
from pyiceberg.catalog.noop import NoopCatalog
+from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.expressions import BoundReference
from pyiceberg.io import (
ADLS_ACCOUNT_KEY,
@@ -71,6 +74,7 @@ from pyiceberg.serializers import ToOutputFile
from pyiceberg.table import FileScanTask, Table
from pyiceberg.table.metadata import TableMetadataV1, TableMetadataV2,
TableMetadataV3
from pyiceberg.transforms import DayTransform, IdentityTransform
+from pyiceberg.typedef import Identifier
from pyiceberg.types import (
BinaryType,
BooleanType,
@@ -434,17 +438,6 @@ def iceberg_schema_simple_no_ids() -> Schema:
)
[email protected](scope="session")
-def iceberg_table_schema_simple() -> Schema:
- return Schema(
- NestedField(field_id=1, name="foo", field_type=StringType(),
required=False),
- NestedField(field_id=2, name="bar", field_type=IntegerType(),
required=True),
- NestedField(field_id=3, name="baz", field_type=BooleanType(),
required=False),
- schema_id=0,
- identifier_field_ids=[],
- )
-
-
@pytest.fixture(scope="session")
def iceberg_schema_nested() -> Schema:
return Schema(
@@ -1887,7 +1880,7 @@ def test_schema() -> Schema:
@pytest.fixture(scope="session")
-def test_partition_spec() -> Schema:
+def test_partition_spec() -> PartitionSpec:
return PartitionSpec(
PartitionField(1, 1000, IdentityTransform(), "VendorID"),
PartitionField(2, 1001, DayTransform(), "tpep_pickup_day"),
@@ -2960,3 +2953,159 @@ def ray_session() -> Generator[Any, None, None]:
)
yield ray
ray.shutdown()
+
+
+# Catalog fixtures
+
+
+def _create_memory_catalog(name: str, warehouse: Path) -> InMemoryCatalog:
+ return InMemoryCatalog(name, warehouse=f"file://{warehouse}")
+
+
+def _create_sql_catalog(name: str, warehouse: Path) -> SqlCatalog:
+ catalog = SqlCatalog(
+ name,
+ uri="sqlite:///:memory:",
+ warehouse=f"file://{warehouse}",
+ )
+ catalog.create_tables()
+ return catalog
+
+
+def _create_sql_without_rowcount_catalog(name: str, warehouse: Path) ->
SqlCatalog:
+ props = {
+ "uri": f"sqlite:////{warehouse}/sql-catalog",
+ "warehouse": f"file://{warehouse}",
+ }
+ catalog = SqlCatalog(name, **props)
+ catalog.engine.dialect.supports_sane_rowcount = False
+ catalog.create_tables()
+ return catalog
+
+
+_CATALOG_FACTORIES = {
+ "memory": _create_memory_catalog,
+ "sql": _create_sql_catalog,
+ "sql_without_rowcount": _create_sql_without_rowcount_catalog,
+}
+
+
[email protected](params=list(_CATALOG_FACTORIES.keys()))
+def catalog(request: pytest.FixtureRequest, tmp_path: Path) ->
Generator[Catalog, None, None]:
+ """Parameterized fixture that yields catalogs listed in
_CATALOG_FACTORIES."""
+ catalog_type = request.param
+ factory = _CATALOG_FACTORIES[catalog_type]
+ cat = factory("test_catalog", tmp_path)
+ yield cat
+ if hasattr(cat, "destroy_tables"):
+ cat.destroy_tables()
+
+
[email protected](params=list(_CATALOG_FACTORIES.keys()))
+def catalog_with_warehouse(
+ request: pytest.FixtureRequest,
+ warehouse: Path,
+) -> Generator[Catalog, None, None]:
+ factory = _CATALOG_FACTORIES[request.param]
+ cat = factory("test_catalog", warehouse)
+ yield cat
+ if hasattr(cat, "destroy_tables"):
+ cat.destroy_tables()
+
+
[email protected](name="random_table_identifier")
+def fixture_random_table_identifier(warehouse: Path, database_name: str,
table_name: str) -> Identifier:
+ os.makedirs(f"{warehouse}/{database_name}/{table_name}/metadata/",
exist_ok=True)
+ return database_name, table_name
+
+
[email protected](name="another_random_table_identifier")
+def fixture_another_random_table_identifier(warehouse: Path, database_name:
str, table_name: str) -> Identifier:
+ database_name = database_name + "_new"
+ table_name = table_name + "_new"
+ os.makedirs(f"{warehouse}/{database_name}/{table_name}/metadata/",
exist_ok=True)
+ return database_name, table_name
+
+
[email protected](name="random_hierarchical_identifier")
+def fixture_random_hierarchical_identifier(warehouse: Path,
hierarchical_namespace_name: str, table_name: str) -> Identifier:
+
os.makedirs(f"{warehouse}/{hierarchical_namespace_name}/{table_name}/metadata/",
exist_ok=True)
+ return Catalog.identifier_to_tuple(".".join((hierarchical_namespace_name,
table_name)))
+
+
[email protected](name="another_random_hierarchical_identifier")
+def fixture_another_random_hierarchical_identifier(
+ warehouse: Path, hierarchical_namespace_name: str, table_name: str
+) -> Identifier:
+ hierarchical_namespace_name = hierarchical_namespace_name + "_new"
+ table_name = table_name + "_new"
+
os.makedirs(f"{warehouse}/{hierarchical_namespace_name}/{table_name}/metadata/",
exist_ok=True)
+ return Catalog.identifier_to_tuple(".".join((hierarchical_namespace_name,
table_name)))
+
+
[email protected](scope="session")
+def fixed_test_table_identifier() -> Identifier:
+ return "com", "organization", "department", "my_table"
+
+
[email protected](scope="session")
+def another_fixed_test_table_identifier() -> Identifier:
+ return "com", "organization", "department_alt", "my_another_table"
+
+
[email protected](scope="session")
+def fixed_test_table_namespace() -> Identifier:
+ return "com", "organization", "department"
+
+
[email protected](
+ scope="session",
+ params=[
+ lazy_fixture("fixed_test_table_identifier"),
+ lazy_fixture("random_table_identifier"),
+ lazy_fixture("random_hierarchical_identifier"),
+ ],
+)
+def test_table_identifier(request: pytest.FixtureRequest) -> Identifier:
+ return request.param
+
+
[email protected](
+ scope="session",
+ params=[
+ lazy_fixture("another_fixed_test_table_identifier"),
+ lazy_fixture("another_random_table_identifier"),
+ lazy_fixture("another_random_hierarchical_identifier"),
+ ],
+)
+def another_table_identifier(request: pytest.FixtureRequest) -> Identifier:
+ return request.param
+
+
[email protected](
+ params=[
+ lazy_fixture("database_name"),
+ lazy_fixture("hierarchical_namespace_name"),
+ lazy_fixture("fixed_test_table_namespace"),
+ ],
+)
+def test_namespace(request: pytest.FixtureRequest) -> Identifier:
+ ns = request.param
+ if isinstance(ns, tuple):
+ return ns
+ if "." in ns:
+ return tuple(ns.split("."))
+ return (ns,)
+
+
[email protected](scope="session")
+def test_namespace_properties() -> dict[str, str]:
+ return {"key1": "value1", "key2": "value2"}
+
+
[email protected](scope="session")
+def test_table_properties() -> dict[str, str]:
+ return {
+ "key1": "value1",
+ "key2": "value2",
+ }
diff --git a/tests/table/test_upsert.py b/tests/table/test_upsert.py
index 35a3a119..9bc61799 100644
--- a/tests/table/test_upsert.py
+++ b/tests/table/test_upsert.py
@@ -27,11 +27,11 @@ from pyiceberg.expressions import AlwaysTrue, And, EqualTo,
Reference
from pyiceberg.expressions.literals import LongLiteral
from pyiceberg.io.pyarrow import schema_to_pyarrow
from pyiceberg.schema import Schema
-from pyiceberg.table import UpsertResult
+from pyiceberg.table import Table, UpsertResult
from pyiceberg.table.snapshots import Operation
from pyiceberg.table.upsert_util import create_match_filter
from pyiceberg.types import IntegerType, NestedField, StringType, StructType
-from tests.catalog.test_base import InMemoryCatalog, Table
+from tests.catalog.test_base import InMemoryCatalog
@pytest.fixture