This is an automated email from the ASF dual-hosted git repository.
Fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new 0bdff480 Add `@typing.override` to catalog and FileIO implementations
(#3360)
0bdff480 is described below
commit 0bdff48005c0969d1bc320c7fdfa5c96f49bb4ae
Author: vishnu prakash <[email protected]>
AuthorDate: Sat May 16 01:59:43 2026 +0530
Add `@typing.override` to catalog and FileIO implementations (#3360)
Adds `@override` from `typing_extensions` to all concrete
implementations of `Catalog`, `MetastoreCatalog`, `FileIO`, `InputFile`,
and `OutputFile` so overridden base-class methods are explicit and can
be verified by static type checkers.
Uses `from typing_extensions import override` (same pattern already used
for `Self` in `typedef.py`) instead of `from typing import override`
which requires Python 3.12+ and would break the supported 3.10/3.11
versions.
Closes #1310
# Rationale for this change
`@typing.override` (PEP 698) makes it explicit which methods
intentionally override a parent class method, helping static type
checkers catch mistakes early. The previous attempt (#1312) and the
currently open PR (#3359) both use `from typing import override` which
only exists in Python 3.12+. This PR uses `typing_extensions` to support
all versions from 3.10+.
## Are these changes tested?
No new tests are needed — `@override` is a static analysis marker with
zero runtime effect. `make lint` and `make test` (3625 passed) both pass
cleanly.
## Are there any user-facing changes?
No.
---
pyiceberg/catalog/__init__.py | 8 ++++++++
pyiceberg/catalog/bigquery_metastore.py | 18 ++++++++++++++++++
pyiceberg/catalog/dynamodb.py | 19 +++++++++++++++++++
pyiceberg/catalog/glue.py | 19 +++++++++++++++++++
pyiceberg/catalog/hive.py | 20 ++++++++++++++++++++
pyiceberg/catalog/noop.py | 25 +++++++++++++++++++++++++
pyiceberg/catalog/rest/__init__.py | 24 ++++++++++++++++++++++++
pyiceberg/catalog/sql.py | 20 ++++++++++++++++++++
pyiceberg/io/fsspec.py | 11 +++++++++++
pyiceberg/io/pyarrow.py | 9 +++++++++
10 files changed, 173 insertions(+)
diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py
index ef3e51ae..95ceaa53 100644
--- a/pyiceberg/catalog/__init__.py
+++ b/pyiceberg/catalog/__init__.py
@@ -31,6 +31,8 @@ from typing import (
cast,
)
+from typing_extensions import override
+
from pyiceberg.exceptions import (
NamespaceAlreadyExistsError,
NoSuchNamespaceError,
@@ -904,9 +906,11 @@ class MetastoreCatalog(Catalog, ABC):
def __init__(self, name: str, **properties: str):
super().__init__(name, **properties)
+ @override
def supports_server_side_planning(self) -> bool:
return False
+ @override
def create_table_transaction(
self,
identifier: str | Identifier,
@@ -920,6 +924,7 @@ class MetastoreCatalog(Catalog, ABC):
self._create_staged_table(identifier, schema, location,
partition_spec, sort_order, properties)
)
+ @override
def table_exists(self, identifier: str | Identifier) -> bool:
try:
self.load_table(identifier)
@@ -927,6 +932,7 @@ class MetastoreCatalog(Catalog, ABC):
except NoSuchTableError:
return False
+ @override
def namespace_exists(self, namespace: str | Identifier) -> bool:
"""Check if a namespace exists.
@@ -942,6 +948,7 @@ class MetastoreCatalog(Catalog, ABC):
except NoSuchNamespaceError:
return False
+ @override
def purge_table(self, identifier: str | Identifier) -> None:
table = self.load_table(identifier)
self.drop_table(identifier)
@@ -962,6 +969,7 @@ class MetastoreCatalog(Catalog, ABC):
delete_files(io, prev_metadata_files, PREVIOUS_METADATA)
delete_files(io, {table.metadata_location}, METADATA)
+ @override
def create_view(
self,
identifier: str | Identifier,
diff --git a/pyiceberg/catalog/bigquery_metastore.py
b/pyiceberg/catalog/bigquery_metastore.py
index d389e2e1..938ac699 100644
--- a/pyiceberg/catalog/bigquery_metastore.py
+++ b/pyiceberg/catalog/bigquery_metastore.py
@@ -26,6 +26,7 @@ from google.cloud.bigquery.external_config import
ExternalCatalogDatasetOptions,
from google.cloud.bigquery.schema import SerDeInfo, StorageDescriptor
from google.cloud.exceptions import Conflict
from google.oauth2 import service_account
+from typing_extensions import override
from pyiceberg.catalog import WAREHOUSE_LOCATION, MetastoreCatalog,
PropertiesUpdateSummary
from pyiceberg.exceptions import NamespaceAlreadyExistsError,
NoSuchNamespaceError, NoSuchTableError, TableAlreadyExistsError
@@ -101,6 +102,7 @@ class BigQueryMetastoreCatalog(MetastoreCatalog):
self.location = location
self.project_id = project_id
+ @override
def create_table(
self,
identifier: str | Identifier,
@@ -156,6 +158,7 @@ class BigQueryMetastoreCatalog(MetastoreCatalog):
return self.load_table(identifier=identifier)
+ @override
def create_namespace(self, namespace: str | Identifier, properties:
Properties = EMPTY_DICT) -> None:
"""Create a namespace in the catalog.
@@ -177,6 +180,7 @@ class BigQueryMetastoreCatalog(MetastoreCatalog):
except Conflict as e:
raise NamespaceAlreadyExistsError("Namespace {database_name}
already exists") from e
+ @override
def load_table(self, identifier: str | Identifier) -> Table:
"""
Load the table's metadata and returns the table instance.
@@ -205,6 +209,7 @@ class BigQueryMetastoreCatalog(MetastoreCatalog):
except NotFound as e:
raise NoSuchTableError(f"Table does not exist:
{dataset_name}.{table_name}") from e
+ @override
def drop_table(self, identifier: str | Identifier) -> None:
"""Drop a table.
@@ -225,14 +230,17 @@ class BigQueryMetastoreCatalog(MetastoreCatalog):
except NoSuchTableError as e:
raise NoSuchTableError(f"Table does not exist:
{dataset_name}.{table_name}") from e
+ @override
def commit_table(
self, table: Table, requirements: tuple[TableRequirement, ...],
updates: tuple[TableUpdate, ...]
) -> CommitTableResponse:
raise NotImplementedError
+ @override
def rename_table(self, from_identifier: str | Identifier, to_identifier:
str | Identifier) -> Table:
raise NotImplementedError
+ @override
def drop_namespace(self, namespace: str | Identifier) -> None:
database_name = self.identifier_to_database(namespace)
@@ -243,6 +251,7 @@ class BigQueryMetastoreCatalog(MetastoreCatalog):
except NotFound as e:
raise NoSuchNamespaceError(f"Namespace {namespace} does not
exist.") from e
+ @override
def list_tables(self, namespace: str | Identifier) -> list[Identifier]:
database_name = self.identifier_to_database(namespace)
iceberg_tables: list[Identifier] = []
@@ -257,6 +266,7 @@ class BigQueryMetastoreCatalog(MetastoreCatalog):
raise NoSuchNamespaceError(f"Namespace (dataset) '{database_name}'
not found.") from None
return iceberg_tables
+ @override
def list_namespaces(self, namespace: str | Identifier = ()) ->
list[Identifier]:
# Since this catalog only supports one-level namespaces, it always
returns an empty list unless
# passed an empty namespace to list all namespaces within the catalog.
@@ -267,6 +277,7 @@ class BigQueryMetastoreCatalog(MetastoreCatalog):
datasets_iterator = self.client.list_datasets()
return [(dataset.dataset_id,) for dataset in datasets_iterator]
+ @override
def register_table(self, identifier: str | Identifier, metadata_location:
str, overwrite: bool = False) -> Table:
"""Register a new table using existing metadata.
@@ -302,21 +313,27 @@ class BigQueryMetastoreCatalog(MetastoreCatalog):
return self.load_table(identifier=identifier)
+ @override
def list_views(self, namespace: str | Identifier) -> list[Identifier]:
raise NotImplementedError
+ @override
def register_view(self, identifier: str | Identifier, metadata_location:
str) -> View:
raise NotImplementedError
+ @override
def drop_view(self, identifier: str | Identifier) -> None:
raise NotImplementedError
+ @override
def view_exists(self, identifier: str | Identifier) -> bool:
raise NotImplementedError
+ @override
def load_view(self, identifier: str | Identifier) -> View:
raise NotImplementedError
+ @override
def load_namespace_properties(self, namespace: str | Identifier) ->
Properties:
dataset_name = self.identifier_to_database(namespace)
@@ -329,6 +346,7 @@ class BigQueryMetastoreCatalog(MetastoreCatalog):
raise NoSuchNamespaceError(f"Namespace {namespace} not found")
from e
return {}
+ @override
def update_namespace_properties(
self, namespace: str | Identifier, removals: set[str] | None = None,
updates: Properties = EMPTY_DICT
) -> PropertiesUpdateSummary:
diff --git a/pyiceberg/catalog/dynamodb.py b/pyiceberg/catalog/dynamodb.py
index 2aaf11c8..74c0be6c 100644
--- a/pyiceberg/catalog/dynamodb.py
+++ b/pyiceberg/catalog/dynamodb.py
@@ -24,6 +24,7 @@ from typing import (
)
import boto3
+from typing_extensions import override
from pyiceberg.catalog import (
BOTOCORE_SESSION,
@@ -151,6 +152,7 @@ class DynamoDbCatalog(MetastoreCatalog):
else:
return True
+ @override
def create_table(
self,
identifier: str | Identifier,
@@ -210,6 +212,7 @@ class DynamoDbCatalog(MetastoreCatalog):
return self.load_table(identifier=identifier)
+ @override
def register_table(self, identifier: str | Identifier, metadata_location:
str, overwrite: bool = False) -> Table:
"""Register a new table using existing metadata.
@@ -226,6 +229,7 @@ class DynamoDbCatalog(MetastoreCatalog):
"""
raise NotImplementedError
+ @override
def commit_table(
self, table: Table, requirements: tuple[TableRequirement, ...],
updates: tuple[TableUpdate, ...]
) -> CommitTableResponse:
@@ -245,6 +249,7 @@ class DynamoDbCatalog(MetastoreCatalog):
"""
raise NotImplementedError
+ @override
def load_table(self, identifier: str | Identifier) -> Table:
"""
Load the table's metadata and returns the table instance.
@@ -265,6 +270,7 @@ class DynamoDbCatalog(MetastoreCatalog):
dynamo_table_item =
self._get_iceberg_table_item(database_name=database_name, table_name=table_name)
return
self._convert_dynamo_table_item_to_iceberg_table(dynamo_table_item=dynamo_table_item)
+ @override
def drop_table(self, identifier: str | Identifier) -> None:
"""Drop a table.
@@ -285,6 +291,7 @@ class DynamoDbCatalog(MetastoreCatalog):
except ConditionalCheckFailedException as e:
raise NoSuchTableError(f"Table does not exist:
{database_name}.{table_name}") from e
+ @override
def rename_table(self, from_identifier: str | Identifier, to_identifier:
str | Identifier) -> Table:
"""Rename a fully classified table name.
@@ -351,6 +358,7 @@ class DynamoDbCatalog(MetastoreCatalog):
return self.load_table(to_identifier)
+ @override
def create_namespace(self, namespace: str | Identifier, properties:
Properties = EMPTY_DICT) -> None:
"""Create a namespace in the catalog.
@@ -372,6 +380,7 @@ class DynamoDbCatalog(MetastoreCatalog):
except ConditionalCheckFailedException as e:
raise NamespaceAlreadyExistsError(f"Database {database_name}
already exists") from e
+ @override
def drop_namespace(self, namespace: str | Identifier) -> None:
"""Drop a namespace.
@@ -399,6 +408,7 @@ class DynamoDbCatalog(MetastoreCatalog):
except ConditionalCheckFailedException as e:
raise NoSuchNamespaceError(f"Database does not exist:
{database_name}") from e
+ @override
def list_tables(self, namespace: str | Identifier) -> list[Identifier]:
"""List Iceberg tables under the given namespace in the catalog.
@@ -443,6 +453,7 @@ class DynamoDbCatalog(MetastoreCatalog):
return table_identifiers
+ @override
def list_namespaces(self, namespace: str | Identifier = ()) ->
list[Identifier]:
"""List top-level namespaces from the catalog.
@@ -485,6 +496,7 @@ class DynamoDbCatalog(MetastoreCatalog):
return database_identifiers
+ @override
def load_namespace_properties(self, namespace: str | Identifier) ->
Properties:
"""
Get properties for a namespace.
@@ -503,6 +515,7 @@ class DynamoDbCatalog(MetastoreCatalog):
namespace_dict = _convert_dynamo_item_to_regular_dict(namespace_item)
return _get_namespace_properties(namespace_dict=namespace_dict)
+ @override
def update_namespace_properties(
self, namespace: str | Identifier, removals: set[str] | None = None,
updates: Properties = EMPTY_DICT
) -> PropertiesUpdateSummary:
@@ -540,6 +553,7 @@ class DynamoDbCatalog(MetastoreCatalog):
return properties_update_summary
+ @override
def create_view(
self,
identifier: str | Identifier,
@@ -550,18 +564,23 @@ class DynamoDbCatalog(MetastoreCatalog):
) -> View:
raise NotImplementedError
+ @override
def list_views(self, namespace: str | Identifier) -> list[Identifier]:
raise NotImplementedError
+ @override
def register_view(self, identifier: str | Identifier, metadata_location:
str) -> View:
raise NotImplementedError
+ @override
def drop_view(self, identifier: str | Identifier) -> None:
raise NotImplementedError
+ @override
def view_exists(self, identifier: str | Identifier) -> bool:
raise NotImplementedError
+ @override
def load_view(self, identifier: str | Identifier) -> View:
raise NotImplementedError
diff --git a/pyiceberg/catalog/glue.py b/pyiceberg/catalog/glue.py
index 92d0aa45..12b36efc 100644
--- a/pyiceberg/catalog/glue.py
+++ b/pyiceberg/catalog/glue.py
@@ -27,6 +27,7 @@ from typing import (
import boto3
from botocore.config import Config
+from typing_extensions import override
from pyiceberg.catalog import (
BOTOCORE_SESSION,
@@ -538,6 +539,7 @@ class GlueCatalog(MetastoreCatalog):
catalog=self,
)
+ @override
def create_table(
self,
identifier: str | Identifier,
@@ -601,6 +603,7 @@ class GlueCatalog(MetastoreCatalog):
catalog=self,
)
+ @override
def register_table(self, identifier: str | Identifier, metadata_location:
str, overwrite: bool = False) -> Table:
"""Register a new table using existing metadata.
@@ -627,6 +630,7 @@ class GlueCatalog(MetastoreCatalog):
self._create_glue_table(database_name=database_name,
table_name=table_name, table_input=table_input)
return self.load_table(identifier=identifier)
+ @override
def commit_table(
self, table: Table, requirements: tuple[TableRequirement, ...],
updates: tuple[TableUpdate, ...]
) -> CommitTableResponse:
@@ -710,6 +714,7 @@ class GlueCatalog(MetastoreCatalog):
metadata=updated_staged_table.metadata,
metadata_location=updated_staged_table.metadata_location
)
+ @override
def load_table(self, identifier: str | Identifier) -> Table:
"""Load the table's metadata and returns the table instance.
@@ -729,6 +734,7 @@ class GlueCatalog(MetastoreCatalog):
return
self._convert_glue_to_iceberg(self._get_glue_table(database_name=database_name,
table_name=table_name))
+ @override
def drop_table(self, identifier: str | Identifier) -> None:
"""Drop a table.
@@ -744,6 +750,7 @@ class GlueCatalog(MetastoreCatalog):
except self.glue.exceptions.EntityNotFoundException as e:
raise NoSuchTableError(f"Table does not exist:
{database_name}.{table_name}") from e
+ @override
def rename_table(self, from_identifier: str | Identifier, to_identifier:
str | Identifier) -> Table:
"""Rename a fully classified table name.
@@ -804,6 +811,7 @@ class GlueCatalog(MetastoreCatalog):
return self.load_table(to_identifier)
+ @override
def create_namespace(self, namespace: str | Identifier, properties:
Properties = EMPTY_DICT) -> None:
"""Create a namespace in the catalog.
@@ -821,6 +829,7 @@ class GlueCatalog(MetastoreCatalog):
except self.glue.exceptions.AlreadyExistsException as e:
raise NamespaceAlreadyExistsError(f"Database {database_name}
already exists") from e
+ @override
def drop_namespace(self, namespace: str | Identifier) -> None:
"""Drop a namespace.
@@ -850,6 +859,7 @@ class GlueCatalog(MetastoreCatalog):
)
self.glue.delete_database(Name=database_name)
+ @override
def list_tables(self, namespace: str | Identifier) -> list[Identifier]:
"""List Iceberg tables under the given namespace in the catalog.
@@ -881,6 +891,7 @@ class GlueCatalog(MetastoreCatalog):
raise NoSuchNamespaceError(f"Database does not exist:
{database_name}") from e
return [(database_name, table["Name"]) for table in table_list if
self.__is_iceberg_table(table)]
+ @override
def list_namespaces(self, namespace: str | Identifier = ()) ->
list[Identifier]:
"""List namespaces from the given namespace. If not given, list
top-level namespaces from the catalog.
@@ -903,6 +914,7 @@ class GlueCatalog(MetastoreCatalog):
return [self.identifier_to_tuple(database["Name"]) for database in
database_list]
+ @override
def load_namespace_properties(self, namespace: str | Identifier) ->
Properties:
"""Get properties for a namespace.
@@ -933,6 +945,7 @@ class GlueCatalog(MetastoreCatalog):
return properties
+ @override
def update_namespace_properties(
self, namespace: str | Identifier, removals: set[str] | None = None,
updates: Properties = EMPTY_DICT
) -> PropertiesUpdateSummary:
@@ -957,6 +970,7 @@ class GlueCatalog(MetastoreCatalog):
return properties_update_summary
+ @override
def create_view(
self,
identifier: str | Identifier,
@@ -967,18 +981,23 @@ class GlueCatalog(MetastoreCatalog):
) -> View:
raise NotImplementedError
+ @override
def list_views(self, namespace: str | Identifier) -> list[Identifier]:
raise NotImplementedError
+ @override
def register_view(self, identifier: str | Identifier, metadata_location:
str) -> View:
raise NotImplementedError
+ @override
def drop_view(self, identifier: str | Identifier) -> None:
raise NotImplementedError
+ @override
def view_exists(self, identifier: str | Identifier) -> bool:
raise NotImplementedError
+ @override
def load_view(self, identifier: str | Identifier) -> View:
raise NotImplementedError
diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py
index 8d2a7430..d4ab6c8b 100644
--- a/pyiceberg/catalog/hive.py
+++ b/pyiceberg/catalog/hive.py
@@ -50,6 +50,7 @@ from hive_metastore.ttypes import Table as HiveTable
from tenacity import retry, retry_if_exception_type, stop_after_attempt,
wait_exponential
from thrift.protocol import TBinaryProtocol
from thrift.transport import TSocket, TTransport
+from typing_extensions import override
from pyiceberg.catalog import (
EXTERNAL_TABLE,
@@ -390,6 +391,7 @@ class HiveCatalog(MetastoreCatalog):
except NoSuchObjectException as e:
raise NoSuchTableError(f"Table does not exists: {table_name}")
from e
+ @override
def create_table(
self,
identifier: str | Identifier,
@@ -436,6 +438,7 @@ class HiveCatalog(MetastoreCatalog):
return self._convert_hive_into_iceberg(hive_table)
+ @override
def create_view(
self,
identifier: str | Identifier,
@@ -446,6 +449,7 @@ class HiveCatalog(MetastoreCatalog):
) -> View:
raise NotImplementedError
+ @override
def register_table(self, identifier: str | Identifier, metadata_location:
str, overwrite: bool = False) -> Table:
"""Register a new table using existing metadata.
@@ -480,12 +484,15 @@ class HiveCatalog(MetastoreCatalog):
return self._convert_hive_into_iceberg(hive_table)
+ @override
def list_views(self, namespace: str | Identifier) -> list[Identifier]:
raise NotImplementedError
+ @override
def view_exists(self, identifier: str | Identifier) -> bool:
raise NotImplementedError
+ @override
def load_view(self, identifier: str | Identifier) -> View:
raise NotImplementedError
@@ -518,6 +525,7 @@ class HiveCatalog(MetastoreCatalog):
return _do_wait_for_lock()
+ @override
def commit_table(
self, table: Table, requirements: tuple[TableRequirement, ...],
updates: tuple[TableUpdate, ...]
) -> CommitTableResponse:
@@ -630,6 +638,7 @@ class HiveCatalog(MetastoreCatalog):
metadata=updated_staged_table.metadata,
metadata_location=updated_staged_table.metadata_location
)
+ @override
def load_table(self, identifier: str | Identifier) -> Table:
"""Load the table's metadata and return the table instance.
@@ -652,6 +661,7 @@ class HiveCatalog(MetastoreCatalog):
return self._convert_hive_into_iceberg(hive_table)
+ @override
def drop_table(self, identifier: str | Identifier) -> None:
"""Drop a table.
@@ -669,10 +679,12 @@ class HiveCatalog(MetastoreCatalog):
# When the namespace doesn't exist, it throws the same error
raise NoSuchTableError(f"Table does not exists: {table_name}")
from e
+ @override
def purge_table(self, identifier: str | Identifier) -> None:
# This requires to traverse the reachability set, and drop all the
data files.
raise NotImplementedError("Not yet implemented")
+ @override
def rename_table(self, from_identifier: str | Identifier, to_identifier:
str | Identifier) -> Table:
"""Rename a fully classified table name.
@@ -712,6 +724,7 @@ class HiveCatalog(MetastoreCatalog):
raise NoSuchNamespaceError(f"Database does not exists:
{to_database_name}") from e
return self.load_table(to_identifier)
+ @override
def create_namespace(self, namespace: str | Identifier, properties:
Properties = EMPTY_DICT) -> None:
"""Create a namespace in the catalog.
@@ -732,6 +745,7 @@ class HiveCatalog(MetastoreCatalog):
except AlreadyExistsException as e:
raise NamespaceAlreadyExistsError(f"Database {database_name}
already exists") from e
+ @override
def drop_namespace(self, namespace: str | Identifier) -> None:
"""Drop a namespace.
@@ -751,6 +765,7 @@ class HiveCatalog(MetastoreCatalog):
except (MetaException, NoSuchObjectException) as e:
raise NoSuchNamespaceError(f"Database does not exists:
{database_name}") from e
+ @override
def list_tables(self, namespace: str | Identifier) -> list[Identifier]:
"""List Iceberg tables under the given namespace in the catalog.
@@ -775,6 +790,7 @@ class HiveCatalog(MetastoreCatalog):
if table.parameters.get(TABLE_TYPE, "").lower() == ICEBERG
]
+ @override
def list_namespaces(self, namespace: str | Identifier = ()) ->
list[Identifier]:
"""List namespaces from the given namespace. If not given, list
top-level namespaces from the catalog.
@@ -788,6 +804,7 @@ class HiveCatalog(MetastoreCatalog):
with self._client as open_client:
return list(map(self.identifier_to_tuple,
open_client.get_all_databases()))
+ @override
def load_namespace_properties(self, namespace: str | Identifier) ->
Properties:
"""Get properties for a namespace.
@@ -812,6 +829,7 @@ class HiveCatalog(MetastoreCatalog):
except NoSuchObjectException as e:
raise NoSuchNamespaceError(f"Database does not exists:
{database_name}") from e
+ @override
def update_namespace_properties(
self, namespace: str | Identifier, removals: set[str] | None = None,
updates: Properties = EMPTY_DICT
) -> PropertiesUpdateSummary:
@@ -854,9 +872,11 @@ class HiveCatalog(MetastoreCatalog):
return PropertiesUpdateSummary(removed=list(removed or []),
updated=list(updated or []), missing=list(expected_to_change))
+ @override
def register_view(self, identifier: str | Identifier, metadata_location:
str) -> View:
raise NotImplementedError
+ @override
def drop_view(self, identifier: str | Identifier) -> None:
raise NotImplementedError
diff --git a/pyiceberg/catalog/noop.py b/pyiceberg/catalog/noop.py
index 3a1a8e7c..aeb3c728 100644
--- a/pyiceberg/catalog/noop.py
+++ b/pyiceberg/catalog/noop.py
@@ -20,6 +20,8 @@ from typing import (
TYPE_CHECKING,
)
+from typing_extensions import override
+
from pyiceberg.catalog import Catalog, PropertiesUpdateSummary
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
@@ -42,6 +44,7 @@ if TYPE_CHECKING:
class NoopCatalog(Catalog):
+ @override
def create_table(
self,
identifier: str | Identifier,
@@ -53,6 +56,7 @@ class NoopCatalog(Catalog):
) -> Table:
raise NotImplementedError
+ @override
def create_table_transaction(
self,
identifier: str | Identifier,
@@ -64,12 +68,15 @@ class NoopCatalog(Catalog):
) -> CreateTableTransaction:
raise NotImplementedError
+ @override
def load_table(self, identifier: str | Identifier) -> Table:
raise NotImplementedError
+ @override
def table_exists(self, identifier: str | Identifier) -> bool:
raise NotImplementedError
+ @override
def register_table(self, identifier: str | Identifier, metadata_location:
str, overwrite: bool = False) -> Table:
"""Register a new table using existing metadata.
@@ -86,58 +93,75 @@ class NoopCatalog(Catalog):
"""
raise NotImplementedError
+ @override
def drop_table(self, identifier: str | Identifier) -> None:
raise NotImplementedError
+ @override
def supports_server_side_planning(self) -> bool:
return False
+ @override
def purge_table(self, identifier: str | Identifier) -> None:
raise NotImplementedError
+ @override
def rename_table(self, from_identifier: str | Identifier, to_identifier:
str | Identifier) -> Table:
raise NotImplementedError
+ @override
def commit_table(
self, table: Table, requirements: tuple[TableRequirement, ...],
updates: tuple[TableUpdate, ...]
) -> CommitTableResponse:
raise NotImplementedError
+ @override
def create_namespace(self, namespace: str | Identifier, properties:
Properties = EMPTY_DICT) -> None:
raise NotImplementedError
+ @override
def drop_namespace(self, namespace: str | Identifier) -> None:
raise NotImplementedError
+ @override
def list_tables(self, namespace: str | Identifier) -> list[Identifier]:
raise NotImplementedError
+ @override
def list_namespaces(self, namespace: str | Identifier = ()) ->
list[Identifier]:
raise NotImplementedError
+ @override
def load_namespace_properties(self, namespace: str | Identifier) ->
Properties:
raise NotImplementedError
+ @override
def update_namespace_properties(
self, namespace: str | Identifier, removals: set[str] | None = None,
updates: Properties = EMPTY_DICT
) -> PropertiesUpdateSummary:
raise NotImplementedError
+ @override
def list_views(self, namespace: str | Identifier) -> list[Identifier]:
raise NotImplementedError
+ @override
def view_exists(self, identifier: str | Identifier) -> bool:
raise NotImplementedError
+ @override
def namespace_exists(self, namespace: str | Identifier) -> bool:
raise NotImplementedError
+ @override
def register_view(self, identifier: str | Identifier, metadata_location:
str) -> View:
raise NotImplementedError
+ @override
def drop_view(self, identifier: str | Identifier) -> None:
raise NotImplementedError
+ @override
def create_view(
self,
identifier: str | Identifier,
@@ -148,5 +172,6 @@ class NoopCatalog(Catalog):
) -> View:
raise NotImplementedError
+ @override
def load_view(self, identifier: str | Identifier) -> View:
raise NotImplementedError
diff --git a/pyiceberg/catalog/rest/__init__.py
b/pyiceberg/catalog/rest/__init__.py
index 1832f6e1..7fa81312 100644
--- a/pyiceberg/catalog/rest/__init__.py
+++ b/pyiceberg/catalog/rest/__init__.py
@@ -27,6 +27,7 @@ from urllib.parse import quote, unquote
from pydantic import ConfigDict, Field, TypeAdapter, field_validator
from requests import HTTPError, Session
from tenacity import RetryCallState, retry, retry_if_exception_type,
stop_after_attempt
+from typing_extensions import override
from pyiceberg import __version__
from pyiceberg.catalog import BOTOCORE_SESSION, TOKEN, URI,
WAREHOUSE_LOCATION, Catalog, PropertiesUpdateSummary
@@ -476,6 +477,7 @@ class RestCatalog(Catalog):
merged_properties[AUTH_MANAGER] = self._auth_manager
return load_file_io(merged_properties, location)
+ @override
def supports_server_side_planning(self) -> bool:
"""Check if the catalog supports server-side scan planning."""
return Capability.V1_SUBMIT_TABLE_SCAN_PLAN in
self._supported_endpoints and property_as_bool(
@@ -906,6 +908,7 @@ class RestCatalog(Catalog):
_handle_non_200_response(exc, {409: TableAlreadyExistsError, 404:
NoSuchNamespaceError})
return TableResponse.model_validate_json(response.text)
+ @override
@retry(**_RETRY_ARGS)
def create_table(
self,
@@ -927,6 +930,7 @@ class RestCatalog(Catalog):
)
return self._response_to_table(self.identifier_to_tuple(identifier),
table_response)
+ @override
@retry(**_RETRY_ARGS)
def create_table_transaction(
self,
@@ -949,6 +953,7 @@ class RestCatalog(Catalog):
staged_table =
self._response_to_staged_table(self.identifier_to_tuple(identifier),
table_response)
return CreateTableTransaction(staged_table)
+ @override
@retry(**_RETRY_ARGS)
def create_view(
self,
@@ -988,6 +993,7 @@ class RestCatalog(Catalog):
return self._response_to_view(self.identifier_to_tuple(identifier),
view_response)
@retry(**_RETRY_ARGS)
+ @override
def register_table(self, identifier: str | Identifier, metadata_location:
str, overwrite: bool = False) -> Table:
"""Register a new table using existing metadata.
@@ -1023,6 +1029,7 @@ class RestCatalog(Catalog):
return self._response_to_table(self.identifier_to_tuple(identifier),
table_response)
@retry(**_RETRY_ARGS)
+ @override
def list_tables(self, namespace: str | Identifier) -> list[Identifier]:
self._check_endpoint(Capability.V1_LIST_TABLES)
namespace_tuple = self._check_valid_namespace_identifier(namespace)
@@ -1035,6 +1042,7 @@ class RestCatalog(Catalog):
return [(*table.namespace, table.name) for table in
ListTablesResponse.model_validate_json(response.text).identifiers]
@retry(**_RETRY_ARGS)
+ @override
def load_table(self, identifier: str | Identifier) -> Table:
self._check_endpoint(Capability.V1_LOAD_TABLE)
params = {}
@@ -1056,6 +1064,7 @@ class RestCatalog(Catalog):
return self._response_to_table(self.identifier_to_tuple(identifier),
table_response)
@retry(**_RETRY_ARGS)
+ @override
def drop_table(self, identifier: str | Identifier, purge_requested: bool =
False) -> None:
self._check_endpoint(Capability.V1_DELETE_TABLE)
response = self._session.delete(
@@ -1068,10 +1077,12 @@ class RestCatalog(Catalog):
_handle_non_200_response(exc, {404: NoSuchTableError})
@retry(**_RETRY_ARGS)
+ @override
def purge_table(self, identifier: str | Identifier) -> None:
self.drop_table(identifier=identifier, purge_requested=True)
@retry(**_RETRY_ARGS)
+ @override
def rename_table(self, from_identifier: str | Identifier, to_identifier:
str | Identifier) -> Table:
self._check_endpoint(Capability.V1_RENAME_TABLE)
payload = {
@@ -1108,6 +1119,7 @@ class RestCatalog(Catalog):
return table_request
@retry(**_RETRY_ARGS)
+ @override
def list_views(self, namespace: str | Identifier) -> list[Identifier]:
if Capability.V1_LIST_VIEWS not in self._supported_endpoints:
return []
@@ -1136,6 +1148,7 @@ class RestCatalog(Catalog):
return views
@retry(**_RETRY_ARGS)
+ @override
def load_view(self, identifier: str | Identifier) -> View:
self._check_endpoint(Capability.V1_LOAD_VIEW)
response = self._session.get(
@@ -1150,6 +1163,7 @@ class RestCatalog(Catalog):
return self._response_to_view(self.identifier_to_tuple(identifier),
view_response)
@retry(**_RETRY_ARGS)
+ @override
def commit_table(
self, table: Table, requirements: tuple[TableRequirement, ...],
updates: tuple[TableUpdate, ...]
) -> CommitTableResponse:
@@ -1197,6 +1211,7 @@ class RestCatalog(Catalog):
return CommitTableResponse.model_validate_json(response.text)
@retry(**_RETRY_ARGS)
+ @override
def create_namespace(self, namespace: str | Identifier, properties:
Properties = EMPTY_DICT) -> None:
self._check_endpoint(Capability.V1_CREATE_NAMESPACE)
namespace_tuple = self._check_valid_namespace_identifier(namespace)
@@ -1208,6 +1223,7 @@ class RestCatalog(Catalog):
_handle_non_200_response(exc, {409: NamespaceAlreadyExistsError})
@retry(**_RETRY_ARGS)
+ @override
def drop_namespace(self, namespace: str | Identifier) -> None:
self._check_endpoint(Capability.V1_DELETE_NAMESPACE)
namespace_tuple = self._check_valid_namespace_identifier(namespace)
@@ -1219,6 +1235,7 @@ class RestCatalog(Catalog):
_handle_non_200_response(exc, {404: NoSuchNamespaceError, 409:
NamespaceNotEmptyError})
@retry(**_RETRY_ARGS)
+ @override
def list_namespaces(self, namespace: str | Identifier = ()) ->
list[Identifier]:
self._check_endpoint(Capability.V1_LIST_NAMESPACES)
namespace_tuple = self.identifier_to_tuple(namespace)
@@ -1237,6 +1254,7 @@ class RestCatalog(Catalog):
return
ListNamespaceResponse.model_validate_json(response.text).namespaces
@retry(**_RETRY_ARGS)
+ @override
def load_namespace_properties(self, namespace: str | Identifier) ->
Properties:
self._check_endpoint(Capability.V1_LOAD_NAMESPACE)
namespace_tuple = self._check_valid_namespace_identifier(namespace)
@@ -1250,6 +1268,7 @@ class RestCatalog(Catalog):
return NamespaceResponse.model_validate_json(response.text).properties
@retry(**_RETRY_ARGS)
+ @override
def update_namespace_properties(
self, namespace: str | Identifier, removals: set[str] | None = None,
updates: Properties = EMPTY_DICT
) -> PropertiesUpdateSummary:
@@ -1270,6 +1289,7 @@ class RestCatalog(Catalog):
)
@retry(**_RETRY_ARGS)
+ @override
def namespace_exists(self, namespace: str | Identifier) -> bool:
namespace_tuple = self._check_valid_namespace_identifier(namespace)
namespace = self._encode_namespace_path(namespace_tuple)
@@ -1297,6 +1317,7 @@ class RestCatalog(Catalog):
return False
@retry(**_RETRY_ARGS)
+ @override
def table_exists(self, identifier: str | Identifier) -> bool:
"""Check if a table exists.
@@ -1331,6 +1352,7 @@ class RestCatalog(Catalog):
return False
@retry(**_RETRY_ARGS)
+ @override
def view_exists(self, identifier: str | Identifier) -> bool:
"""Check if a view exists.
@@ -1356,6 +1378,7 @@ class RestCatalog(Catalog):
return False
@retry(**_RETRY_ARGS)
+ @override
def register_view(self, identifier: str | Identifier, metadata_location:
str) -> View:
self._check_endpoint(Capability.V1_REGISTER_VIEW)
namespace_and_view = self._split_identifier_for_path(identifier,
IdentifierKind.VIEW)
@@ -1379,6 +1402,7 @@ class RestCatalog(Catalog):
return self._response_to_view(self.identifier_to_tuple(identifier),
view_response)
@retry(**_RETRY_ARGS)
+ @override
def drop_view(self, identifier: str) -> None:
self._check_endpoint(Capability.V1_DELETE_VIEW)
response = self._session.delete(
diff --git a/pyiceberg/catalog/sql.py b/pyiceberg/catalog/sql.py
index 92ac5375..87446bd5 100644
--- a/pyiceberg/catalog/sql.py
+++ b/pyiceberg/catalog/sql.py
@@ -38,6 +38,7 @@ from sqlalchemy.orm import (
Session,
mapped_column,
)
+from typing_extensions import override
from pyiceberg.catalog import (
METADATA_LOCATION,
@@ -172,6 +173,7 @@ class SqlCatalog(MetastoreCatalog):
catalog=self,
)
+ @override
def create_table(
self,
identifier: str | Identifier,
@@ -237,6 +239,7 @@ class SqlCatalog(MetastoreCatalog):
return self.load_table(identifier=identifier)
+ @override
def register_table(self, identifier: str | Identifier, metadata_location:
str, overwrite: bool = False) -> Table:
"""Register a new table using existing metadata.
@@ -278,6 +281,7 @@ class SqlCatalog(MetastoreCatalog):
return self.load_table(identifier=identifier)
+ @override
def load_table(self, identifier: str | Identifier) -> Table:
"""Load the table's metadata and return the table instance.
@@ -307,6 +311,7 @@ class SqlCatalog(MetastoreCatalog):
return self._convert_orm_to_iceberg(result)
raise NoSuchTableError(f"Table does not exist:
{namespace}.{table_name}")
+ @override
def drop_table(self, identifier: str | Identifier) -> None:
"""Drop a table.
@@ -347,6 +352,7 @@ class SqlCatalog(MetastoreCatalog):
raise NoSuchTableError(f"Table does not exist:
{namespace}.{table_name}") from e
session.commit()
+ @override
def rename_table(self, from_identifier: str | Identifier, to_identifier:
str | Identifier) -> Table:
"""Rename a fully classified table name.
@@ -406,6 +412,7 @@ class SqlCatalog(MetastoreCatalog):
raise TableAlreadyExistsError(f"Table
{to_namespace}.{to_table_name} already exists") from e
return self.load_table(to_identifier)
+ @override
def commit_table(
self, table: Table, requirements: tuple[TableRequirement, ...],
updates: tuple[TableUpdate, ...]
) -> CommitTableResponse:
@@ -502,6 +509,7 @@ class SqlCatalog(MetastoreCatalog):
metadata=updated_staged_table.metadata,
metadata_location=updated_staged_table.metadata_location
)
+ @override
def namespace_exists(self, identifier: str | Identifier) -> bool:
namespace_tuple = Catalog.identifier_to_tuple(identifier)
namespace = Catalog.namespace_to_string(namespace_tuple,
NoSuchNamespaceError)
@@ -534,6 +542,7 @@ class SqlCatalog(MetastoreCatalog):
return True
return False
+ @override
def create_namespace(self, namespace: str | Identifier, properties:
Properties = EMPTY_DICT) -> None:
"""Create a namespace in the catalog.
@@ -562,6 +571,7 @@ class SqlCatalog(MetastoreCatalog):
)
session.commit()
+ @override
def drop_namespace(self, namespace: str | Identifier) -> None:
"""Drop a namespace.
@@ -588,6 +598,7 @@ class SqlCatalog(MetastoreCatalog):
)
session.commit()
+ @override
def list_tables(self, namespace: str | Identifier) -> list[Identifier]:
"""List tables under the given namespace in the catalog.
@@ -609,6 +620,7 @@ class SqlCatalog(MetastoreCatalog):
result = session.scalars(stmt)
return [(Catalog.identifier_to_tuple(table.table_namespace) +
(table.table_name,)) for table in result]
+ @override
def list_namespaces(self, namespace: str | Identifier = ()) ->
list[Identifier]:
"""List namespaces from the given namespace. If not given, list
top-level namespaces from the catalog.
@@ -650,6 +662,7 @@ class SqlCatalog(MetastoreCatalog):
return namespaces
+ @override
def load_namespace_properties(self, namespace: str | Identifier) ->
Properties:
"""Get properties for a namespace.
@@ -673,6 +686,7 @@ class SqlCatalog(MetastoreCatalog):
result = session.scalars(stmt)
return {props.property_key: props.property_value for props in
result}
+ @override
def update_namespace_properties(
self, namespace: str | Identifier, removals: set[str] | None = None,
updates: Properties = EMPTY_DICT
) -> PropertiesUpdateSummary:
@@ -729,6 +743,7 @@ class SqlCatalog(MetastoreCatalog):
session.commit()
return properties_update_summary
+ @override
def create_view(
self,
identifier: str | Identifier,
@@ -739,18 +754,23 @@ class SqlCatalog(MetastoreCatalog):
) -> View:
raise NotImplementedError
+ @override
def list_views(self, namespace: str | Identifier) -> list[Identifier]:
raise NotImplementedError
+ @override
def view_exists(self, identifier: str | Identifier) -> bool:
raise NotImplementedError
+ @override
def register_view(self, identifier: str | Identifier, metadata_location:
str) -> View:
raise NotImplementedError
+ @override
def drop_view(self, identifier: str | Identifier) -> None:
raise NotImplementedError
+ @override
def load_view(self, identifier: str | Identifier) -> View:
raise NotImplementedError
diff --git a/pyiceberg/io/fsspec.py b/pyiceberg/io/fsspec.py
index 92255cb8..7749268f 100644
--- a/pyiceberg/io/fsspec.py
+++ b/pyiceberg/io/fsspec.py
@@ -35,6 +35,7 @@ import requests
from fsspec import AbstractFileSystem
from fsspec.implementations.local import LocalFileSystem
from requests import HTTPError
+from typing_extensions import override
from pyiceberg.catalog import TOKEN, URI
from pyiceberg.catalog.rest.auth import AUTH_MANAGER
@@ -332,6 +333,7 @@ class FsspecInputFile(InputFile):
self._fs = fs
super().__init__(location=location)
+ @override
def __len__(self) -> int:
"""Return the total length of the file, in bytes."""
object_info = self._fs.info(self.location)
@@ -341,10 +343,12 @@ class FsspecInputFile(InputFile):
return object_info["size"]
raise RuntimeError(f"Cannot retrieve object info: {self.location}")
+ @override
def exists(self) -> bool:
"""Check whether the location exists."""
return self._fs.lexists(self.location)
+ @override
def open(self, seekable: bool = True) -> InputStream:
"""Create an input stream for reading the contents of the file.
@@ -376,6 +380,7 @@ class FsspecOutputFile(OutputFile):
self._fs = fs
super().__init__(location=location)
+ @override
def __len__(self) -> int:
"""Return the total length of the file, in bytes."""
object_info = self._fs.info(self.location)
@@ -385,10 +390,12 @@ class FsspecOutputFile(OutputFile):
return object_info["size"]
raise RuntimeError(f"Cannot retrieve object info: {self.location}")
+ @override
def exists(self) -> bool:
"""Check whether the location exists."""
return self._fs.lexists(self.location)
+ @override
def create(self, overwrite: bool = False) -> OutputStream:
"""Create an output stream for reading the contents of the file.
@@ -411,6 +418,7 @@ class FsspecOutputFile(OutputFile):
raise FileExistsError(f"Cannot create file, file already exists:
{self.location}")
return self._fs.open(self.location, "wb")
+ @override
def to_input_file(self) -> FsspecInputFile:
"""Return a new FsspecInputFile for the location at `self.location`."""
return FsspecInputFile(location=self.location, fs=self._fs)
@@ -424,6 +432,7 @@ class FsspecFileIO(FileIO):
self._thread_locals = threading.local()
super().__init__(properties=properties)
+ @override
def new_input(self, location: str) -> FsspecInputFile:
"""Get an FsspecInputFile instance to read bytes from the file at the
given location.
@@ -437,6 +446,7 @@ class FsspecFileIO(FileIO):
fs = self._get_fs_from_uri(uri)
return FsspecInputFile(location=location, fs=fs)
+ @override
def new_output(self, location: str) -> FsspecOutputFile:
"""Get an FsspecOutputFile instance to write bytes to the file at the
given location.
@@ -450,6 +460,7 @@ class FsspecFileIO(FileIO):
fs = self._get_fs_from_uri(uri)
return FsspecOutputFile(location=location, fs=fs)
+ @override
def delete(self, location: str | InputFile | OutputFile) -> None:
"""Delete the file at the given location.
diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py
index 4517ae73..d4414c7c 100644
--- a/pyiceberg/io/pyarrow.py
+++ b/pyiceberg/io/pyarrow.py
@@ -63,6 +63,7 @@ from pyarrow.fs import (
FileSystem,
FileType,
)
+from typing_extensions import override
from pyiceberg.conversions import to_bytes
from pyiceberg.exceptions import ResolveError
@@ -300,11 +301,13 @@ class PyArrowFile(InputFile, OutputFile):
raise FileNotFoundError(f"Cannot get file info, file not found:
{self.location}")
return file_info
+ @override
def __len__(self) -> int:
"""Return the total length of the file, in bytes."""
file_info = self._file_info()
return file_info.size
+ @override
def exists(self) -> bool:
"""Check whether the location exists."""
try:
@@ -313,6 +316,7 @@ class PyArrowFile(InputFile, OutputFile):
except FileNotFoundError:
return False
+ @override
def open(self, seekable: bool = True) -> InputStream:
"""Open the location using a PyArrow FileSystem inferred from the
location.
@@ -342,6 +346,7 @@ class PyArrowFile(InputFile, OutputFile):
raise # pragma: no cover - If some other kind of OSError, raise
the raw error
return input_file
+ @override
def create(self, overwrite: bool = False) -> OutputStream:
"""Create a writable pyarrow.lib.NativeFile for this PyArrowFile's
location.
@@ -373,6 +378,7 @@ class PyArrowFile(InputFile, OutputFile):
raise # pragma: no cover - If some other kind of OSError, raise
the raw error
return output_file
+ @override
def to_input_file(self) -> PyArrowFile:
"""Return a new PyArrowFile for the location of an existing
PyArrowFile instance.
@@ -610,6 +616,7 @@ class PyArrowFileIO(FileIO):
def _initialize_local_fs(self) -> FileSystem:
return PyArrowLocalFileSystem()
+ @override
def new_input(self, location: str) -> PyArrowFile:
"""Get a PyArrowFile instance to read bytes from the file at the given
location.
@@ -627,6 +634,7 @@ class PyArrowFileIO(FileIO):
buffer_size=int(self.properties.get(BUFFER_SIZE, ONE_MEGABYTE)),
)
+ @override
def new_output(self, location: str) -> PyArrowFile:
"""Get a PyArrowFile instance to write bytes to the file at the given
location.
@@ -644,6 +652,7 @@ class PyArrowFileIO(FileIO):
buffer_size=int(self.properties.get(BUFFER_SIZE, ONE_MEGABYTE)),
)
+ @override
def delete(self, location: str | InputFile | OutputFile) -> None:
"""Delete the file at the given location.