This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new 7deb739 Add SqlCatalog `_commit_table` support (#265)
7deb739 is described below
commit 7deb739439b40ff7a7d82bac923e849630ffa92c
Author: Sung Yun <[email protected]>
AuthorDate: Wed Jan 17 07:36:24 2024 -0500
Add SqlCatalog `_commit_table` support (#265)
* sql commit
* SqlCatalog _commit_table
* better variable names
* fallback to FOR UPDATE commit when engine.dialect.supports_sane_rowcount
is False
* remove stray print
* wait
* better logging
---
pyiceberg/catalog/sql.py | 132 ++++++++++++++++++++++++++++++++++++++--------
tests/catalog/test_sql.py | 55 +++++++++++++++++++
2 files changed, 165 insertions(+), 22 deletions(-)
diff --git a/pyiceberg/catalog/sql.py b/pyiceberg/catalog/sql.py
index 77ece56..593c6b5 100644
--- a/pyiceberg/catalog/sql.py
+++ b/pyiceberg/catalog/sql.py
@@ -31,7 +31,7 @@ from sqlalchemy import (
union,
update,
)
-from sqlalchemy.exc import IntegrityError, OperationalError
+from sqlalchemy.exc import IntegrityError, NoResultFound, OperationalError
from sqlalchemy.orm import (
DeclarativeBase,
Mapped,
@@ -48,6 +48,7 @@ from pyiceberg.catalog import (
PropertiesUpdateSummary,
)
from pyiceberg.exceptions import (
+ CommitFailedException,
NamespaceAlreadyExistsError,
NamespaceNotEmptyError,
NoSuchNamespaceError,
@@ -59,7 +60,7 @@ from pyiceberg.io import load_file_io
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.serializers import FromInputFile
-from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table
+from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table,
update_table_metadata
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT
@@ -268,16 +269,32 @@ class SqlCatalog(Catalog):
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
database_name, table_name =
self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
with Session(self.engine) as session:
- res = session.execute(
- delete(IcebergTables).where(
- IcebergTables.catalog_name == self.name,
- IcebergTables.table_namespace == database_name,
- IcebergTables.table_name == table_name,
+ if self.engine.dialect.supports_sane_rowcount:
+ res = session.execute(
+ delete(IcebergTables).where(
+ IcebergTables.catalog_name == self.name,
+ IcebergTables.table_namespace == database_name,
+ IcebergTables.table_name == table_name,
+ )
)
- )
+ if res.rowcount < 1:
+ raise NoSuchTableError(f"Table does not exist:
{database_name}.{table_name}")
+ else:
+ try:
+ tbl = (
+ session.query(IcebergTables)
+ .with_for_update(of=IcebergTables)
+ .filter(
+ IcebergTables.catalog_name == self.name,
+ IcebergTables.table_namespace == database_name,
+ IcebergTables.table_name == table_name,
+ )
+ .one()
+ )
+ session.delete(tbl)
+ except NoResultFound as e:
+ raise NoSuchTableError(f"Table does not exist:
{database_name}.{table_name}") from e
session.commit()
- if res.rowcount < 1:
- raise NoSuchTableError(f"Table does not exist:
{database_name}.{table_name}")
def rename_table(self, from_identifier: Union[str, Identifier],
to_identifier: Union[str, Identifier]) -> Table:
"""Rename a fully classified table name.
@@ -301,18 +318,35 @@ class SqlCatalog(Catalog):
raise NoSuchNamespaceError(f"Namespace does not exist:
{to_database_name}")
with Session(self.engine) as session:
try:
- stmt = (
- update(IcebergTables)
- .where(
- IcebergTables.catalog_name == self.name,
- IcebergTables.table_namespace == from_database_name,
- IcebergTables.table_name == from_table_name,
+ if self.engine.dialect.supports_sane_rowcount:
+ stmt = (
+ update(IcebergTables)
+ .where(
+ IcebergTables.catalog_name == self.name,
+ IcebergTables.table_namespace ==
from_database_name,
+ IcebergTables.table_name == from_table_name,
+ )
+ .values(table_namespace=to_database_name,
table_name=to_table_name)
)
- .values(table_namespace=to_database_name,
table_name=to_table_name)
- )
- result = session.execute(stmt)
- if result.rowcount < 1:
- raise NoSuchTableError(f"Table does not exist:
{from_table_name}")
+ result = session.execute(stmt)
+ if result.rowcount < 1:
+ raise NoSuchTableError(f"Table does not exist:
{from_table_name}")
+ else:
+ try:
+ tbl = (
+ session.query(IcebergTables)
+ .with_for_update(of=IcebergTables)
+ .filter(
+ IcebergTables.catalog_name == self.name,
+ IcebergTables.table_namespace ==
from_database_name,
+ IcebergTables.table_name == from_table_name,
+ )
+ .one()
+ )
+ tbl.table_namespace = to_database_name
+ tbl.table_name = to_table_name
+ except NoResultFound as e:
+ raise NoSuchTableError(f"Table does not exist:
{from_table_name}") from e
session.commit()
except IntegrityError as e:
raise TableAlreadyExistsError(f"Table
{to_database_name}.{to_table_name} already exists") from e
@@ -329,8 +363,62 @@ class SqlCatalog(Catalog):
Raises:
NoSuchTableError: If a table with the given identifier does not
exist.
+ CommitFailedException: If the commit failed.
"""
- raise NotImplementedError
+ identifier_tuple = self.identifier_to_tuple_without_catalog(
+ tuple(table_request.identifier.namespace.root +
[table_request.identifier.name])
+ )
+ current_table = self.load_table(identifier_tuple)
+ database_name, table_name =
self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
+ base_metadata = current_table.metadata
+ for requirement in table_request.requirements:
+ requirement.validate(base_metadata)
+
+ updated_metadata = update_table_metadata(base_metadata,
table_request.updates)
+ if updated_metadata == base_metadata:
+ # no changes, do nothing
+ return CommitTableResponse(metadata=base_metadata,
metadata_location=current_table.metadata_location)
+
+ # write new metadata
+ new_metadata_version =
self._parse_metadata_version(current_table.metadata_location) + 1
+ new_metadata_location =
self._get_metadata_location(current_table.metadata.location,
new_metadata_version)
+ self._write_metadata(updated_metadata, current_table.io,
new_metadata_location)
+
+ with Session(self.engine) as session:
+ if self.engine.dialect.supports_sane_rowcount:
+ stmt = (
+ update(IcebergTables)
+ .where(
+ IcebergTables.catalog_name == self.name,
+ IcebergTables.table_namespace == database_name,
+ IcebergTables.table_name == table_name,
+ IcebergTables.metadata_location ==
current_table.metadata_location,
+ )
+ .values(metadata_location=new_metadata_location,
previous_metadata_location=current_table.metadata_location)
+ )
+ result = session.execute(stmt)
+ if result.rowcount < 1:
+ raise CommitFailedException(f"Table has been updated by
another process: {database_name}.{table_name}")
+ else:
+ try:
+ tbl = (
+ session.query(IcebergTables)
+ .with_for_update(of=IcebergTables)
+ .filter(
+ IcebergTables.catalog_name == self.name,
+ IcebergTables.table_namespace == database_name,
+ IcebergTables.table_name == table_name,
+ IcebergTables.metadata_location ==
current_table.metadata_location,
+ )
+ .one()
+ )
+ tbl.metadata_location = new_metadata_location
+ tbl.previous_metadata_location =
current_table.metadata_location
+ except NoResultFound as e:
+ raise CommitFailedException(f"Table has been updated by
another process: {database_name}.{table_name}") from e
+ session.commit()
+
+ return CommitTableResponse(metadata=updated_metadata,
metadata_location=new_metadata_location)
def _namespace_exists(self, identifier: Union[str, Identifier]) -> bool:
namespace = self.identifier_to_database(identifier)
diff --git a/tests/catalog/test_sql.py b/tests/catalog/test_sql.py
index 95dc24a..8bf921a 100644
--- a/tests/catalog/test_sql.py
+++ b/tests/catalog/test_sql.py
@@ -42,6 +42,7 @@ from pyiceberg.table.sorting import (
SortOrder,
)
from pyiceberg.transforms import IdentityTransform
+from pyiceberg.types import IntegerType
@pytest.fixture(name="warehouse", scope="session")
@@ -87,6 +88,19 @@ def catalog_sqlite(warehouse: Path) -> Generator[SqlCatalog,
None, None]:
catalog.destroy_tables()
[email protected](scope="module")
+def catalog_sqlite_without_rowcount(warehouse: Path) -> Generator[SqlCatalog,
None, None]:
+ props = {
+ "uri": "sqlite:////tmp/sql-catalog.db",
+ "warehouse": f"file://{warehouse}",
+ }
+ catalog = SqlCatalog("test_sql_catalog", **props)
+ catalog.engine.dialect.supports_sane_rowcount = False
+ catalog.create_tables()
+ yield catalog
+ catalog.destroy_tables()
+
+
def test_creation_with_no_uri() -> None:
with pytest.raises(NoSuchPropertyException):
SqlCatalog("test_ddb_catalog", not_uri="unused")
@@ -305,6 +319,7 @@ def test_load_table_from_self_identifier(catalog:
SqlCatalog, table_schema_neste
[
lazy_fixture('catalog_memory'),
lazy_fixture('catalog_sqlite'),
+ lazy_fixture('catalog_sqlite_without_rowcount'),
],
)
def test_drop_table(catalog: SqlCatalog, table_schema_nested: Schema,
random_identifier: Identifier) -> None:
@@ -322,6 +337,7 @@ def test_drop_table(catalog: SqlCatalog,
table_schema_nested: Schema, random_ide
[
lazy_fixture('catalog_memory'),
lazy_fixture('catalog_sqlite'),
+ lazy_fixture('catalog_sqlite_without_rowcount'),
],
)
def test_drop_table_from_self_identifier(catalog: SqlCatalog,
table_schema_nested: Schema, random_identifier: Identifier) -> None:
@@ -341,6 +357,7 @@ def test_drop_table_from_self_identifier(catalog:
SqlCatalog, table_schema_neste
[
lazy_fixture('catalog_memory'),
lazy_fixture('catalog_sqlite'),
+ lazy_fixture('catalog_sqlite_without_rowcount'),
],
)
def test_drop_table_that_does_not_exist(catalog: SqlCatalog,
random_identifier: Identifier) -> None:
@@ -353,6 +370,7 @@ def test_drop_table_that_does_not_exist(catalog:
SqlCatalog, random_identifier:
[
lazy_fixture('catalog_memory'),
lazy_fixture('catalog_sqlite'),
+ lazy_fixture('catalog_sqlite_without_rowcount'),
],
)
def test_rename_table(
@@ -377,6 +395,7 @@ def test_rename_table(
[
lazy_fixture('catalog_memory'),
lazy_fixture('catalog_sqlite'),
+ lazy_fixture('catalog_sqlite_without_rowcount'),
],
)
def test_rename_table_from_self_identifier(
@@ -403,6 +422,7 @@ def test_rename_table_from_self_identifier(
[
lazy_fixture('catalog_memory'),
lazy_fixture('catalog_sqlite'),
+ lazy_fixture('catalog_sqlite_without_rowcount'),
],
)
def test_rename_table_to_existing_one(
@@ -425,6 +445,7 @@ def test_rename_table_to_existing_one(
[
lazy_fixture('catalog_memory'),
lazy_fixture('catalog_sqlite'),
+ lazy_fixture('catalog_sqlite_without_rowcount'),
],
)
def test_rename_missing_table(catalog: SqlCatalog, random_identifier:
Identifier, another_random_identifier: Identifier) -> None:
@@ -439,6 +460,7 @@ def test_rename_missing_table(catalog: SqlCatalog,
random_identifier: Identifier
[
lazy_fixture('catalog_memory'),
lazy_fixture('catalog_sqlite'),
+ lazy_fixture('catalog_sqlite_without_rowcount'),
],
)
def test_rename_table_to_missing_namespace(
@@ -664,3 +686,36 @@ def test_update_namespace_properties(catalog: SqlCatalog,
database_name: str) ->
else:
assert k in update_report.removed
assert "updated test description" ==
catalog.load_namespace_properties(database_name)["comment"]
+
+
[email protected](
+ 'catalog',
+ [
+ lazy_fixture('catalog_memory'),
+ lazy_fixture('catalog_sqlite'),
+ lazy_fixture('catalog_sqlite_without_rowcount'),
+ ],
+)
+def test_commit_table(catalog: SqlCatalog, table_schema_nested: Schema,
random_identifier: Identifier) -> None:
+ database_name, _table_name = random_identifier
+ catalog.create_namespace(database_name)
+ table = catalog.create_table(random_identifier, table_schema_nested)
+
+ assert catalog._parse_metadata_version(table.metadata_location) == 0
+ assert table.metadata.current_schema_id == 0
+
+ transaction = table.transaction()
+ update = transaction.update_schema()
+ update.add_column(path="b", field_type=IntegerType())
+ update.commit()
+ transaction.commit_transaction()
+
+ updated_table_metadata = table.metadata
+
+ assert catalog._parse_metadata_version(table.metadata_location) == 1
+ assert updated_table_metadata.current_schema_id == 1
+ assert len(updated_table_metadata.schemas) == 2
+ new_schema = next(schema for schema in updated_table_metadata.schemas if
schema.schema_id == 1)
+ assert new_schema
+ assert new_schema == update._apply()
+ assert new_schema.find_field("b").field_type == IntegerType()