This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 3a793f2695 [#9522] feat(client-python): complete relational catalog
(#10575)
3a793f2695 is described below
commit 3a793f26957a5e05d8409b2e42550545b8235bf1
Author: George T. C. Lai <[email protected]>
AuthorDate: Fri Apr 3 15:38:07 2026 +0800
[#9522] feat(client-python): complete relational catalog (#10575)
### What changes were proposed in this pull request?
This PR aims to complete implementation of class `RelationalCatalog`
with respect to the following methods:
- alter_table
- purge_table
- drop_table
A refactor of method `load_table` is also conducted in this PR as well.
### Why are the changes needed?
We need to complete the implementation of `RelationalCatalog`.
Fix: #9522
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit tests
---------
Signed-off-by: George T. C. Lai <[email protected]>
---
.../gravitino/api/rel/table_catalog.py | 23 +-
.../gravitino/client/relational_catalog.py | 93 ++++++-
.../dto/requests/table_updates_request.py | 6 +-
.../gravitino/dto/util/dto_converters.py | 159 ++++++++++++
.../test_dto_converters_to_table_update_request.py | 288 +++++++++++++++++++++
.../tests/unittests/test_relational_catalog.py | 176 +++++++++++++
6 files changed, 730 insertions(+), 15 deletions(-)
diff --git a/clients/client-python/gravitino/api/rel/table_catalog.py
b/clients/client-python/gravitino/api/rel/table_catalog.py
index e8a2ec6814..09a8997a5e 100644
--- a/clients/client-python/gravitino/api/rel/table_catalog.py
+++ b/clients/client-python/gravitino/api/rel/table_catalog.py
@@ -18,8 +18,9 @@
from abc import ABC, abstractmethod
from contextlib import suppress
-from typing import Optional
+from typing import Optional, overload
+from gravitino.api.authorization.privileges import Privilege
from gravitino.api.rel.column import Column
from gravitino.api.rel.expressions.distributions.distribution import
Distribution
from gravitino.api.rel.expressions.sorts.sort_order import SortOrder
@@ -54,12 +55,28 @@ class TableCatalog(ABC):
NoSuchSchemaException: If the schema does not exist.
"""
+ @overload
@abstractmethod
- def load_table(self, identifier: NameIdentifier) -> Table:
- """Load table metadata by `NameIdentifier` from the catalog.
+ def load_table(self, identifier: NameIdentifier) -> Table: ...
+
+ @overload
+ @abstractmethod
+ def load_table(
+ self, identifier: NameIdentifier, required_privilege_names:
set[Privilege.Name]
+ ) -> Table: ...
+
+ @abstractmethod
+ def load_table(
+ self,
+ identifier: NameIdentifier,
+ required_privilege_names: Optional[set[Privilege.Name]] = None,
+ ) -> Table:
+ """Load table metadata by `NameIdentifier` from the catalog with
required privileges.
Args:
identifier (NameIdentifier): A table identifier.
+ required_privilege_names (Optional[set[Privilege.Name]], optional):
+ The required privilege names to access the table. Defaults to
`None`.
Returns:
Table: The table metadata.
diff --git a/clients/client-python/gravitino/client/relational_catalog.py
b/clients/client-python/gravitino/client/relational_catalog.py
index 2330334589..c208f96c18 100644
--- a/clients/client-python/gravitino/client/relational_catalog.py
+++ b/clients/client-python/gravitino/client/relational_catalog.py
@@ -15,8 +15,9 @@
# specific language governing permissions and limitations
# under the License.
-from typing import Optional
+from typing import Final, Optional, overload
+from gravitino.api.authorization.privileges import Privilege
from gravitino.api.catalog import Catalog
from gravitino.api.rel.column import Column
from gravitino.api.rel.expressions.distributions.distribution import
Distribution
@@ -30,6 +31,8 @@ from gravitino.client.relational_table import RelationalTable
from gravitino.dto.audit_dto import AuditDTO
from gravitino.dto.rel.distribution_dto import DistributionDTO
from gravitino.dto.requests.table_create_request import TableCreateRequest
+from gravitino.dto.requests.table_updates_request import TableUpdatesRequest
+from gravitino.dto.responses.drop_response import DropResponse
from gravitino.dto.responses.entity_list_response import EntityListResponse
from gravitino.dto.responses.table_response import TableResponse
from gravitino.dto.util.dto_converters import DTOConverters
@@ -40,9 +43,7 @@ from gravitino.rest.rest_utils import encode_string
from gravitino.utils import HTTPClient
-class RelationalCatalog(
- BaseSchemaCatalog, TableCatalog
-): # pylint: disable=too-many-ancestors
+class RelationalCatalog(BaseSchemaCatalog, TableCatalog): # pylint:
disable=too-many-ancestors
"""Relational catalog is a catalog implementation
The `RelationalCatalog` supports relational database like metadata
operations,
@@ -50,6 +51,8 @@ class RelationalCatalog(
catalog is under the metalake.
"""
+ PRIVILEGES: Final[str] = "privileges"
+
def __init__(
self,
catalog_namespace: Namespace,
@@ -188,24 +191,96 @@ class RelationalCatalog(
for ident in entity_list_resp.identifiers()
]
- def load_table(self, identifier: NameIdentifier) -> Table:
+ @overload
+ def load_table(self, identifier: NameIdentifier) -> Table: ...
+
+ @overload
+ def load_table( # pylint: disable=arguments-differ
+ self, identifier: NameIdentifier, required_privilege_names:
set[Privilege.Name]
+ ) -> Table: ...
+
+ def load_table(
+ self,
+ identifier: NameIdentifier,
+ required_privilege_names: Optional[set[Privilege.Name]] = None,
+ ) -> Table:
self._check_table_name_identifier(identifier)
full_namespace = self._get_table_full_namespace(identifier.namespace())
+ query_params = (
+ {
+ RelationalCatalog.PRIVILEGES: ",".join(
+ sorted(priv.name for priv in required_privilege_names)
+ )
+ }
+ if required_privilege_names
+ else None
+ )
+
resp = self.rest_client.get(
f"{self._format_table_request_path(full_namespace)}"
f"/{encode_string(identifier.name())}",
+ params=query_params,
error_handler=TABLE_ERROR_HANDLER,
)
table_resp = TableResponse.from_json(resp.body, infer_missing=True)
table_resp.validate()
return RelationalTable(full_namespace, table_resp.table(),
self.rest_client)
- # TODO: We shall implement the following methods after integration tests
for relational table
def drop_table(self, identifier: NameIdentifier) -> bool:
- raise NotImplementedError("Drop table is not implemented yet.")
+ """Drop the table with specified identifier.
+
+ Args:
+ identifier (NameIdentifier):
+ The identifier of the table, which should be "schema.table"
format.
+
+ Returns:
+ bool:
+ `True` if the table is dropped successfully, `False` if the
table does not exist.
+ """
+
+ return self._drop_table(identifier, purge=False)
def alter_table(self, identifier: NameIdentifier, *changes) -> Table:
- raise NotImplementedError("Alter table is not implemented yet.")
+ self._check_table_name_identifier(identifier)
+ full_namespace = self._get_table_full_namespace(identifier.namespace())
+ updates_request = TableUpdatesRequest(
+ updates=[
+ DTOConverters.to_table_update_request(change) for change in
changes
+ ]
+ )
+ updates_request.validate()
+ resp = self.rest_client.put(
+ f"{self._format_table_request_path(full_namespace)}"
+ f"/{encode_string(identifier.name())}",
+ json=updates_request,
+ error_handler=TABLE_ERROR_HANDLER,
+ )
+ table_resp = TableResponse.from_json(resp.body, infer_missing=True)
+ table_resp.validate()
+ return RelationalTable(full_namespace, table_resp.table(),
self.rest_client)
def purge_table(self, identifier: NameIdentifier) -> bool:
- raise NotImplementedError("Purge table is not implemented yet.")
+ """Purge the table with specified identifier.
+
+ Args:
+ identifier (NameIdentifier):
+ The identifier of the table, which should be "schema.table"
format.
+
+ Returns:
+ bool:
+ `True` if the table is purged successfully, `False` if the
table does not exist.
+ """
+ return self._drop_table(identifier, purge=True)
+
+ def _drop_table(self, identifier: NameIdentifier, purge: bool) -> bool:
+ self._check_table_name_identifier(identifier)
+ full_namespace = self._get_table_full_namespace(identifier.namespace())
+ resp = self.rest_client.delete(
+ f"{self._format_table_request_path(full_namespace)}"
+ f"/{encode_string(identifier.name())}",
+ error_handler=TABLE_ERROR_HANDLER,
+ params={"purge": "true"} if purge else None,
+ )
+ drop_resp = DropResponse.from_json(resp.body, infer_missing=True)
+ drop_resp.validate()
+ return drop_resp.dropped()
diff --git
a/clients/client-python/gravitino/dto/requests/table_updates_request.py
b/clients/client-python/gravitino/dto/requests/table_updates_request.py
index a8b91ab868..7f5a56d79d 100644
--- a/clients/client-python/gravitino/dto/requests/table_updates_request.py
+++ b/clients/client-python/gravitino/dto/requests/table_updates_request.py
@@ -21,7 +21,7 @@ from dataclasses import dataclass, field
from dataclasses_json import config
-from gravitino.dto.requests.table_update_request import TableUpdateRequest
+from gravitino.dto.requests.table_update_request import TableUpdateRequestBase
from gravitino.rest.rest_message import RESTRequest
from gravitino.utils.precondition import Precondition
@@ -30,11 +30,11 @@ from gravitino.utils.precondition import Precondition
class TableUpdatesRequest(RESTRequest):
"""Represents a request to update a table."""
- _updates: list[TableUpdateRequest] = field(
+ _updates: list[TableUpdateRequestBase] = field(
metadata=config(field_name="updates"), default_factory=list
)
- def __init__(self, updates: list[TableUpdateRequest]) -> None:
+ def __init__(self, updates: list[TableUpdateRequestBase]) -> None:
self._updates = updates
def validate(self) -> None:
diff --git a/clients/client-python/gravitino/dto/util/dto_converters.py
b/clients/client-python/gravitino/dto/util/dto_converters.py
index 16fcece621..1aa128e179 100644
--- a/clients/client-python/gravitino/dto/util/dto_converters.py
+++ b/clients/client-python/gravitino/dto/util/dto_converters.py
@@ -42,6 +42,18 @@ from gravitino.api.rel.partitions.list_partition import
ListPartition
from gravitino.api.rel.partitions.partition import Partition
from gravitino.api.rel.partitions.range_partition import RangePartition
from gravitino.api.rel.table import Table
+from gravitino.api.rel.table_change import (
+ AddColumn,
+ DeleteColumn,
+ RenameColumn,
+ TableChange,
+ UpdateColumnAutoIncrement,
+ UpdateColumnComment,
+ UpdateColumnDefaultValue,
+ UpdateColumnNullability,
+ UpdateColumnPosition,
+ UpdateColumnType,
+)
from gravitino.api.rel.types.types import Types
from gravitino.dto.rel.column_dto import ColumnDTO
from gravitino.dto.rel.distribution_dto import DistributionDTO
@@ -77,6 +89,10 @@ from gravitino.dto.rel.partitions.partition_dto import
PartitionDTO
from gravitino.dto.rel.partitions.range_partition_dto import RangePartitionDTO
from gravitino.dto.rel.sort_order_dto import SortOrderDTO
from gravitino.dto.rel.table_dto import TableDTO
+from gravitino.dto.requests.table_update_request import (
+ TableUpdateRequest,
+ TableUpdateRequestBase,
+)
from gravitino.exceptions.base import IllegalArgumentException
@@ -643,3 +659,146 @@ class DTOConverters:
if not dtos:
return []
return [DTOConverters.to_dto(dto) for dto in dtos]
+
+ @singledispatchmethod
+ @staticmethod
+ def _to_column_update_request(change) -> TableUpdateRequestBase:
+ raise IllegalArgumentException(
+ f"Unknown column change type: {change.__class__.__name__}"
+ )
+
+ @_to_column_update_request.register
+ @staticmethod
+ def _(change: AddColumn) -> TableUpdateRequestBase:
+ default_value = (
+ None
+ if change.get_default_value() is None
+ or change.get_default_value() == Column.DEFAULT_VALUE_NOT_SET
+ else DTOConverters.to_function_arg(change.get_default_value())
+ )
+ return TableUpdateRequest.AddTableColumnRequest(
+ _field_name=change.field_name(),
+ _data_type=change.get_data_type(),
+ _comment=change.get_comment(),
+ _position=change.get_position(),
+ _nullable=change.is_nullable(),
+ _auto_increment=change.is_auto_increment(),
+ _default_value=default_value,
+ )
+
+ @_to_column_update_request.register
+ @staticmethod
+ def _(change: RenameColumn) -> TableUpdateRequestBase:
+ return TableUpdateRequest.RenameTableColumnRequest(
+ _old_field_name=change.field_name(),
+ _new_field_name=change.get_new_name(),
+ )
+
+ @_to_column_update_request.register
+ @staticmethod
+ def _(change: UpdateColumnDefaultValue) -> TableUpdateRequestBase:
+ return TableUpdateRequest.UpdateTableColumnDefaultValueRequest(
+ _field_name=change.field_name(),
+ _new_default_value=DTOConverters.to_function_arg(
+ change.get_new_default_value()
+ ),
+ )
+
+ @_to_column_update_request.register
+ @staticmethod
+ def _(change: UpdateColumnType) -> TableUpdateRequestBase:
+ return TableUpdateRequest.UpdateTableColumnTypeRequest(
+ _field_name=change.field_name(),
+ _new_type=change.get_new_data_type(),
+ )
+
+ @_to_column_update_request.register
+ @staticmethod
+ def _(change: UpdateColumnComment) -> TableUpdateRequestBase:
+ return TableUpdateRequest.UpdateTableColumnCommentRequest(
+ _field_name=change.field_name(),
+ _new_comment=change.get_new_comment(),
+ )
+
+ @_to_column_update_request.register
+ @staticmethod
+ def _(change: UpdateColumnPosition) -> TableUpdateRequestBase:
+ return TableUpdateRequest.UpdateTableColumnPositionRequest(
+ _field_name=change.field_name(),
+ _new_position=change.get_position(),
+ )
+
+ @_to_column_update_request.register
+ @staticmethod
+ def _(change: DeleteColumn) -> TableUpdateRequestBase:
+ return TableUpdateRequest.DeleteTableColumnRequest(
+ _field_name=change.field_name(),
+ _if_exists=change.get_if_exists(),
+ )
+
+ @_to_column_update_request.register
+ @staticmethod
+ def _(change: UpdateColumnNullability) -> TableUpdateRequestBase:
+ return TableUpdateRequest.UpdateTableColumnNullabilityRequest(
+ _field_name=change.field_name(),
+ _nullable=change.get_nullable(),
+ )
+
+ @_to_column_update_request.register
+ @staticmethod
+ def _(change: UpdateColumnAutoIncrement) -> TableUpdateRequestBase:
+ return TableUpdateRequest.UpdateColumnAutoIncrementRequest(
+ _field_name=change.field_name(),
+ _auto_increment=change.is_auto_increment(),
+ )
+
+ @staticmethod
+ def to_table_update_request(change) -> TableUpdateRequestBase:
+ if isinstance(change, TableChange.RenameTable):
+ rename_table = cast(TableChange.RenameTable, change)
+ return TableUpdateRequest.RenameTableRequest(
+ _new_name=rename_table.get_new_name(),
+ _new_schema_name=rename_table.get_new_schema_name(),
+ )
+
+ if isinstance(change, TableChange.UpdateComment):
+ return TableUpdateRequest.UpdateTableCommentRequest(
+ _new_comment=cast(TableChange.UpdateComment,
change).get_new_comment()
+ )
+
+ if isinstance(change, TableChange.SetProperty):
+ set_property = cast(TableChange.SetProperty, change)
+ return TableUpdateRequest.SetTablePropertyRequest(
+ _prop=set_property.get_property(),
+ _prop_value=set_property.get_value(),
+ )
+
+ if isinstance(change, TableChange.RemoveProperty):
+ remove_property = cast(TableChange.RemoveProperty, change)
+ return TableUpdateRequest.RemoveTablePropertyRequest(
+ _property=remove_property.get_property()
+ )
+
+ if isinstance(change, TableChange.ColumnChange):
+ return DTOConverters._to_column_update_request(change)
+
+ if isinstance(change, TableChange.AddIndex):
+ add_index = cast(TableChange.AddIndex, change)
+ return TableUpdateRequest.AddTableIndexRequest(
+ Indexes.of(
+ add_index.get_type(),
+ add_index.get_name(),
+ add_index.get_field_names(),
+ ),
+ )
+
+ if isinstance(change, TableChange.DeleteIndex):
+ delete_index = cast(TableChange.DeleteIndex, change)
+ return TableUpdateRequest.DeleteTableIndexRequest(
+ _name=delete_index.get_name(),
+ _if_exists=delete_index.is_if_exists(),
+ )
+
+ raise IllegalArgumentException(
+ f"Unknown change type: {change.__class__.__name__}"
+ )
diff --git
a/clients/client-python/tests/unittests/dto/util/test_dto_converters_to_table_update_request.py
b/clients/client-python/tests/unittests/dto/util/test_dto_converters_to_table_update_request.py
new file mode 100644
index 0000000000..e07e11923c
--- /dev/null
+++
b/clients/client-python/tests/unittests/dto/util/test_dto_converters_to_table_update_request.py
@@ -0,0 +1,288 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from typing import cast
+
+from gravitino.api.rel.expressions.literals.literals import Literals
+from gravitino.api.rel.indexes.index import Index
+from gravitino.api.rel.table_change import TableChange
+from gravitino.api.rel.types.types import Types
+from gravitino.dto.rel.expressions.literal_dto import LiteralDTO
+from gravitino.dto.requests.table_update_request import TableUpdateRequest
+from gravitino.dto.util.dto_converters import DTOConverters
+from gravitino.exceptions.base import IllegalArgumentException
+
+
+class TestDTOConvertersToTableUpdateRequest(unittest.TestCase):
+ def test_to_table_update_request_add_column_with_default_value(self):
+ """Tests the conversion of an add column change with a default value
to a TableUpdateRequest."""
+ add_column = TableChange.add_column(
+ field_name=["new_col"],
+ data_type=Types.IntegerType.get(),
+ comment="test column",
+ nullable=False,
+ auto_increment=True,
+ default_value=Literals.integer_literal(10),
+ )
+ result = cast(
+ TableUpdateRequest.AddTableColumnRequest,
+ DTOConverters.to_table_update_request(add_column),
+ )
+ self.assertIsInstance(result, TableUpdateRequest.AddTableColumnRequest)
+ add_column_request = cast(TableUpdateRequest.AddTableColumnRequest,
result)
+ self.assertListEqual(add_column_request.field_name, ["new_col"])
+ self.assertEqual(add_column_request.data_type, Types.IntegerType.get())
+ self.assertEqual(add_column_request.comment, "test column")
+ self.assertIsNone(add_column_request.position)
+ self.assertFalse(add_column_request.is_nullable)
+ self.assertTrue(add_column_request.is_auto_increment)
+ self.assertIsNotNone(add_column_request.default_value)
+ self.assertIsInstance(add_column_request.default_value, LiteralDTO)
+ default_value_dto = cast(LiteralDTO, add_column_request.default_value)
+ self.assertEqual(default_value_dto.data_type(),
Types.IntegerType.get())
+ self.assertEqual(default_value_dto.value(), "10")
+
+ def test_to_table_update_request_add_column_without_default_value(self):
+ """Tests the conversion of an add column change without a default
value to a TableUpdateRequest."""
+ add_column = TableChange.add_column(
+ field_name=["new_col"],
+ data_type=Types.StringType.get(),
+ )
+ result = DTOConverters.to_table_update_request(add_column)
+ self.assertIsInstance(result, TableUpdateRequest.AddTableColumnRequest)
+ add_column_request = cast(TableUpdateRequest.AddTableColumnRequest,
result)
+ self.assertIsNone(add_column_request.default_value)
+
+ def test_to_table_update_request_unsupported_change(self):
+ """Tests that an unsupported table change raises an
IllegalArgumentException."""
+ with self.assertRaisesRegex(IllegalArgumentException, "Unknown change
type"):
+ DTOConverters.to_table_update_request("invalid_change")
+
+ def test_to_table_update_request_rename_column(self):
+ """Tests the conversion of a rename column change to a
TableUpdateRequest."""
+ rename_column = TableChange.rename_column(
+ field_name=["old_col"],
+ new_name="new_col",
+ )
+ result = DTOConverters.to_table_update_request(rename_column)
+ self.assertIsInstance(result,
TableUpdateRequest.RenameTableColumnRequest)
+ rename_request = cast(TableUpdateRequest.RenameTableColumnRequest,
result)
+ self.assertListEqual(
+ rename_request._old_field_name, # pylint: disable=protected-access
+ ["old_col"],
+ )
+ self.assertEqual(
+ rename_request._new_field_name, # pylint: disable=protected-access
+ "new_col",
+ )
+
+ def test_to_table_update_request_update_column_default_value_full(self):
+ """Tests the conversion of an update column default value change to a
TableUpdateRequest."""
+ literals = [
+ Literals.integer_literal(42),
+ Literals.float_literal(3.14),
+ Literals.string_literal("newDefaultValue"),
+ Literals.NULL,
+ ]
+ for literal in literals:
+ update_default = TableChange.update_column_default_value(
+ field_name=["col"],
+ new_default_value=literal,
+ )
+ result = DTOConverters.to_table_update_request(update_default)
+ self.assertIsInstance(
+ result, TableUpdateRequest.UpdateTableColumnDefaultValueRequest
+ )
+ update_request = cast(
+ TableUpdateRequest.UpdateTableColumnDefaultValueRequest, result
+ )
+ self.assertListEqual(update_request.field_name, ["col"])
+ self.assertIsInstance(update_request.new_default_value, LiteralDTO)
+ integer_literal_dto = cast(LiteralDTO,
update_request.new_default_value)
+ self.assertEqual(integer_literal_dto.data_type(),
literal.data_type())
+ self.assertEqual(
+ integer_literal_dto.value(),
+ "NULL" if literal.value() is None else str(literal.value()),
+ )
+
+ def test_to_table_update_request_update_column_type(self):
+ """Tests the conversion of an update column type change to a
TableUpdateRequest."""
+ column_types = [
+ Types.IntegerType.get(),
+ Types.FloatType.get(),
+ Types.StringType.get(),
+ Types.BooleanType.get(),
+ Types.NullType.get(),
+ ]
+ for new_column_type in column_types:
+ update_type = TableChange.update_column_type(
+ field_name=["col"],
+ new_data_type=new_column_type,
+ )
+ result = DTOConverters.to_table_update_request(update_type)
+ self.assertIsInstance(
+ result, TableUpdateRequest.UpdateTableColumnTypeRequest
+ )
+ update_request = cast(
+ TableUpdateRequest.UpdateTableColumnTypeRequest, result
+ )
+ self.assertListEqual(update_request.field_name, ["col"])
+ self.assertEqual(update_request.new_type, new_column_type)
+
+ def test_to_table_update_request_update_column_comment(self):
+ """Tests the conversion of an update column comment change to a
TableUpdateRequest."""
+ update_comment = TableChange.update_column_comment(
+ field_name=["col"],
+ new_comment="new comment",
+ )
+ result = DTOConverters.to_table_update_request(update_comment)
+ self.assertIsInstance(
+ result, TableUpdateRequest.UpdateTableColumnCommentRequest
+ )
+ update_request = cast(
+ TableUpdateRequest.UpdateTableColumnCommentRequest, result
+ )
+ self.assertListEqual(update_request.field_name, ["col"])
+ self.assertEqual(update_request.new_comment, "new comment")
+
+ def test_to_table_update_request_update_column_position(self):
+ """Tests the conversion of an update column position change with a
ColumnPosition."""
+ positions = [
+ TableChange.ColumnPosition.first(),
+ TableChange.ColumnPosition.default_pos(),
+ TableChange.ColumnPosition.after("ref_col"),
+ ]
+ for position in positions:
+ update_position = TableChange.update_column_position(
+ field_name=["col"],
+ new_position=position,
+ )
+ result = DTOConverters.to_table_update_request(update_position)
+ self.assertIsInstance(
+ result, TableUpdateRequest.UpdateTableColumnPositionRequest
+ )
+ update_request = cast(
+ TableUpdateRequest.UpdateTableColumnPositionRequest, result
+ )
+ self.assertListEqual(update_request.field_name, ["col"])
+ self.assertEqual(str(update_request.new_position), str(position))
+
+ def test_to_table_update_request_delete_column(self):
+ """Tests the conversion of a delete column change to a
TableUpdateRequest."""
+
+ for exists in (True, False):
+ delete_column = TableChange.delete_column(
+ field_name=["col"],
+ if_exists=exists,
+ )
+ result = DTOConverters.to_table_update_request(delete_column)
+ self.assertIsInstance(result,
TableUpdateRequest.DeleteTableColumnRequest)
+ delete_request = cast(TableUpdateRequest.DeleteTableColumnRequest,
result)
+ self.assertListEqual(delete_request.field_name, ["col"])
+ self.assertEqual(delete_request.if_exists, exists)
+
+ def test_to_table_update_request_update_column_nullability(self):
+ """Tests the conversion of an update column nullability change to a
TableUpdateRequest."""
+ for nullable in (True, False):
+ update_nullability = TableChange.update_column_nullability(
+ field_name=["col"],
+ nullable=nullable,
+ )
+ result = DTOConverters.to_table_update_request(update_nullability)
+ self.assertIsInstance(
+ result, TableUpdateRequest.UpdateTableColumnNullabilityRequest
+ )
+ update_request = cast(
+ TableUpdateRequest.UpdateTableColumnNullabilityRequest, result
+ )
+ self.assertListEqual(update_request.field_name, ["col"])
+ self.assertEqual(update_request.nullable, nullable)
+
+ def test_to_table_update_request_update_column_auto_increment(self):
+ """Tests the conversion of an update column auto increment change to a
TableUpdateRequest."""
+ for auto_increment in (True, False):
+ update_auto_increment = TableChange.update_column_auto_increment(
+ field_name=["col"],
+ auto_increment=auto_increment,
+ )
+ result =
DTOConverters.to_table_update_request(update_auto_increment)
+ self.assertIsInstance(
+ result, TableUpdateRequest.UpdateColumnAutoIncrementRequest
+ )
+ update_request = cast(
+ TableUpdateRequest.UpdateColumnAutoIncrementRequest, result
+ )
+ self.assertListEqual(update_request.field_name, ["col"])
+ self.assertEqual(update_request.auto_increment, auto_increment)
+
+ def test_to_table_update_request_rename_table(self):
+ """Tests the conversion of a rename table change to a
TableUpdateRequest."""
+ rename_table = TableChange.rename("new_table_name")
+ result = DTOConverters.to_table_update_request(rename_table)
+ self.assertIsInstance(result, TableUpdateRequest.RenameTableRequest)
+ rename_request = cast(TableUpdateRequest.RenameTableRequest, result)
+ self.assertEqual(rename_request.new_name, "new_table_name")
+ self.assertIsNone(rename_request.new_schema_name)
+
+ def test_to_table_update_request_update_comment(self):
+ """Tests the conversion of an update comment change to a
TableUpdateRequest."""
+ update_comment = TableChange.update_comment("new comment")
+ result = DTOConverters.to_table_update_request(update_comment)
+ self.assertIsInstance(result,
TableUpdateRequest.UpdateTableCommentRequest)
+ comment_request = cast(TableUpdateRequest.UpdateTableCommentRequest,
result)
+ self.assertEqual(comment_request.new_comment, "new comment")
+
+ def test_to_table_update_request_set_property(self):
+ """Tests the conversion of a set property change to a
TableUpdateRequest."""
+ set_property = TableChange.set_property("key", "value")
+ result = DTOConverters.to_table_update_request(set_property)
+ self.assertIsInstance(result,
TableUpdateRequest.SetTablePropertyRequest)
+ property_request = cast(TableUpdateRequest.SetTablePropertyRequest,
result)
+ self.assertEqual(property_request.prop, "key")
+ self.assertEqual(property_request.prop_value, "value")
+
+ def test_to_table_update_request_remove_property(self):
+ """Tests the conversion of a remove property change to a
TableUpdateRequest."""
+ remove_property = TableChange.remove_property("key")
+ result = DTOConverters.to_table_update_request(remove_property)
+ self.assertIsInstance(result,
TableUpdateRequest.RemoveTablePropertyRequest)
+ property_request = cast(TableUpdateRequest.RemoveTablePropertyRequest,
result)
+ self.assertEqual(property_request.property, "key")
+
+ def test_to_table_update_request_add_index(self):
+ """Tests the conversion of an add index change to a
TableUpdateRequest."""
+
+ add_index = TableChange.add_index(
+ index_type=Index.IndexType.PRIMARY_KEY,
+ name="pk_index",
+ field_names=[["id"]],
+ )
+ result = DTOConverters.to_table_update_request(add_index)
+ self.assertIsInstance(result, TableUpdateRequest.AddTableIndexRequest)
+ index_request = cast(TableUpdateRequest.AddTableIndexRequest, result)
+ self.assertEqual(index_request.index.type(),
Index.IndexType.PRIMARY_KEY)
+ self.assertEqual(index_request.index.name(), "pk_index")
+
+ def test_to_table_update_request_delete_index(self):
+ """Tests the conversion of a delete index change to a
TableUpdateRequest."""
+ delete_index = TableChange.delete_index("idx_name", if_exists=False)
+ result = DTOConverters.to_table_update_request(delete_index)
+ self.assertIsInstance(result,
TableUpdateRequest.DeleteTableIndexRequest)
+ index_request = cast(TableUpdateRequest.DeleteTableIndexRequest,
result)
+ self.assertEqual(index_request.name, "idx_name")
+ self.assertFalse(index_request.if_exists)
diff --git a/clients/client-python/tests/unittests/test_relational_catalog.py
b/clients/client-python/tests/unittests/test_relational_catalog.py
index dedf92d0d0..abb80509b9 100644
--- a/clients/client-python/tests/unittests/test_relational_catalog.py
+++ b/clients/client-python/tests/unittests/test_relational_catalog.py
@@ -19,10 +19,13 @@ import unittest
from http.client import HTTPResponse
from unittest.mock import Mock, patch
+from gravitino.api.authorization.privileges import Privilege
+from gravitino.api.rel.table_change import TableChange
from gravitino.client.relational_catalog import RelationalCatalog
from gravitino.dto.audit_dto import AuditDTO
from gravitino.dto.rel.distribution_dto import DistributionDTO
from gravitino.dto.rel.table_dto import TableDTO
+from gravitino.dto.responses.drop_response import DropResponse
from gravitino.dto.responses.entity_list_response import EntityListResponse
from gravitino.dto.responses.table_response import TableResponse
from gravitino.dto.util.dto_converters import DTOConverters
@@ -228,6 +231,25 @@ class TestRelationalCatalog(unittest.TestCase):
table = self.catalog.load_table(self.table_identifier)
self.assertEqual(table.name(), self.table_dto.name())
+ def test_load_table_with_required_privilege_names(self):
+ resp_body = TableResponse(0, self.table_dto)
+ mock_resp = self._get_mock_http_resp(resp_body.to_json())
+
+ with patch(
+ "gravitino.utils.http_client.HTTPClient.get",
+ return_value=mock_resp,
+ ) as mock_get:
+ privileges = {Privilege.Name.SELECT_TABLE,
Privilege.Name.MODIFY_TABLE}
+ table = self.catalog.load_table(
+ self.table_identifier, required_privilege_names=privileges
+ )
+ self.assertEqual(table.name(), self.table_dto.name())
+ mock_get.assert_called_once()
+ call_args = mock_get.call_args
+ self.assertEqual(
+ call_args.kwargs["params"]["privileges"],
"MODIFY_TABLE,SELECT_TABLE"
+ )
+
def test_list_tables(self):
resp_body = EntityListResponse(
0,
@@ -276,3 +298,157 @@ class TestRelationalCatalog(unittest.TestCase):
):
with self.assertRaises(NoSuchSchemaException):
self.catalog.list_tables(namespace=Namespace.of("invalid_schema"))
+
+ def test_drop_table(self):
+ resp_body = DropResponse(0, True)
+ mock_resp = self._get_mock_http_resp(resp_body.to_json())
+
+ with patch(
+ "gravitino.utils.http_client.HTTPClient.delete",
+ return_value=mock_resp,
+ ) as mock_delete:
+ is_dropped = self.catalog.drop_table(self.table_identifier)
+ self.assertTrue(is_dropped)
+ mock_delete.assert_called_once()
+ call_args = mock_delete.call_args
+ self.assertIsNone(call_args.kwargs.get("params"))
+
+ def test_purge_table(self):
+ resp_body = DropResponse(0, True)
+ mock_resp = self._get_mock_http_resp(resp_body.to_json())
+
+ with patch(
+ "gravitino.utils.http_client.HTTPClient.delete",
+ return_value=mock_resp,
+ ) as mock_delete:
+ is_dropped = self.catalog.purge_table(self.table_identifier)
+ self.assertTrue(is_dropped)
+ mock_delete.assert_called_once()
+ call_args = mock_delete.call_args
+ self.assertIn("params", call_args.kwargs)
+ self.assertEqual(call_args.kwargs["params"], {"purge": "true"})
+
+ def test_alter_table(self):
+ resp_body = TableResponse(0, self.table_dto)
+ mock_resp = self._get_mock_http_resp(resp_body.to_json())
+
+ with patch(
+ "gravitino.utils.http_client.HTTPClient.put",
+ return_value=mock_resp,
+ ):
+ table = self.catalog.alter_table(
+ self.table_identifier,
+ TableChange.update_comment("Updated comment"),
+ )
+ self.assertEqual(table.name(), self.table_dto.name())
+
+ def test_alter_table_not_exists(self):
+ with (
+ patch(
+ "gravitino.utils.http_client.HTTPClient.put",
+ side_effect=NoSuchTableException("Table not found"),
+ ),
+ self.assertRaises(NoSuchTableException),
+ ):
+ self.catalog.alter_table(
+ self.table_identifier,
+ TableChange.update_comment("Updated comment"),
+ )
+
+ def test_alter_table_set_property(self):
+ resp_body = TableResponse(0, self.table_dto)
+ mock_resp = self._get_mock_http_resp(resp_body.to_json())
+
+ with patch(
+ "gravitino.utils.http_client.HTTPClient.put",
+ return_value=mock_resp,
+ ):
+ table = self.catalog.alter_table(
+ self.table_identifier,
+ TableChange.set_property("key", "value"),
+ )
+ self.assertEqual(table.name(), self.table_dto.name())
+
+ def test_alter_table_rename(self):
+ resp_body = TableResponse(0, self.table_dto)
+ mock_resp = self._get_mock_http_resp(resp_body.to_json())
+
+ with patch(
+ "gravitino.utils.http_client.HTTPClient.put",
+ return_value=mock_resp,
+ ):
+ table = self.catalog.alter_table(
+ self.table_identifier,
+ TableChange.rename("new_table_name"),
+ )
+ self.assertEqual(table.name(), self.table_dto.name())
+
+ def test_alter_table_remove_property(self):
+ resp_body = TableResponse(0, self.table_dto)
+ mock_resp = self._get_mock_http_resp(resp_body.to_json())
+
+ with patch(
+ "gravitino.utils.http_client.HTTPClient.put",
+ return_value=mock_resp,
+ ):
+ table = self.catalog.alter_table(
+ self.table_identifier,
+ TableChange.remove_property("key"),
+ )
+ self.assertEqual(table.name(), self.table_dto.name())
+
+ def test_alter_table_rename_column(self):
+ resp_body = TableResponse(0, self.table_dto)
+ mock_resp = self._get_mock_http_resp(resp_body.to_json())
+
+ with patch(
+ "gravitino.utils.http_client.HTTPClient.put",
+ return_value=mock_resp,
+ ):
+ table = self.catalog.alter_table(
+ self.table_identifier,
+ TableChange.rename_column(["id"], "new_id"),
+ )
+ self.assertEqual(table.name(), self.table_dto.name())
+
+ def test_alter_table_update_column_comment(self):
+ resp_body = TableResponse(0, self.table_dto)
+ mock_resp = self._get_mock_http_resp(resp_body.to_json())
+
+ with patch(
+ "gravitino.utils.http_client.HTTPClient.put",
+ return_value=mock_resp,
+ ):
+ table = self.catalog.alter_table(
+ self.table_identifier,
+ TableChange.update_column_comment(["id"], "new comment"),
+ )
+ self.assertEqual(table.name(), self.table_dto.name())
+
+ def test_alter_table_delete_column(self):
+ resp_body = TableResponse(0, self.table_dto)
+ mock_resp = self._get_mock_http_resp(resp_body.to_json())
+
+ with patch(
+ "gravitino.utils.http_client.HTTPClient.put",
+ return_value=mock_resp,
+ ):
+ table = self.catalog.alter_table(
+ self.table_identifier,
+ TableChange.delete_column(["id"], if_exists=True),
+ )
+ self.assertEqual(table.name(), self.table_dto.name())
+
+ def test_alter_table_update_column_nullability(self):
+ resp_body = TableResponse(0, self.table_dto)
+ mock_resp = self._get_mock_http_resp(resp_body.to_json())
+
+ with patch(
+ "gravitino.utils.http_client.HTTPClient.put",
+ return_value=mock_resp,
+ ):
+ table = self.catalog.alter_table(
+ self.table_identifier,
+ TableChange.update_column_nullability(["id"], nullable=True),
+ )
+ self.assertEqual(table.name(), self.table_dto.name())