This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c6cca1c93fd Add `S3TablesRenameTableOperator` (#66544)
c6cca1c93fd is described below
commit c6cca1c93fd188e170f91c5c6c5012c5406fa60a
Author: John Jackson <[email protected]>
AuthorDate: Fri May 8 10:12:28 2026 -0700
Add `S3TablesRenameTableOperator` (#66544)
---
providers/amazon/docs/operators/s3_tables.rst | 14 ++++
.../providers/amazon/aws/operators/s3_tables.py | 56 +++++++++++++++
.../tests/system/amazon/aws/example_s3_tables.py | 15 +++-
.../unit/amazon/aws/operators/test_s3_tables.py | 79 ++++++++++++++++++++++
4 files changed, 163 insertions(+), 1 deletion(-)
diff --git a/providers/amazon/docs/operators/s3_tables.rst
b/providers/amazon/docs/operators/s3_tables.rst
index 7e10ec31277..7f0b6de7dff 100644
--- a/providers/amazon/docs/operators/s3_tables.rst
+++ b/providers/amazon/docs/operators/s3_tables.rst
@@ -103,6 +103,20 @@ To delete an Amazon S3 Tables table bucket, use
:start-after: [START howto_operator_s3tables_delete_table_bucket]
:end-before: [END howto_operator_s3tables_delete_table_bucket]
+.. _howto/operator:S3TablesRenameTableOperator:
+
+Rename a Table
+--------------
+
+To rename a table in an Amazon S3 Tables namespace, use
+:class:`~airflow.providers.amazon.aws.operators.s3_tables.S3TablesRenameTableOperator`.
+
+.. exampleinclude:: /../../amazon/tests/system/amazon/aws/example_s3_tables.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_s3tables_rename_table]
+ :end-before: [END howto_operator_s3tables_rename_table]
+
Reference
---------
diff --git
a/providers/amazon/src/airflow/providers/amazon/aws/operators/s3_tables.py
b/providers/amazon/src/airflow/providers/amazon/aws/operators/s3_tables.py
index 2f2b15bb04e..4b2b81572a3 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/s3_tables.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/s3_tables.py
@@ -327,3 +327,59 @@ class
S3TablesCreateNamespaceOperator(AwsBaseOperator[S3TablesHook]):
raise
self.log.info("Namespace %s created.", self.namespace)
return self.namespace
+
+
+class S3TablesRenameTableOperator(AwsBaseOperator[S3TablesHook]):
+ """
+ Rename a table in an Amazon S3 Tables namespace.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:S3TablesRenameTableOperator`
+
+ :param table_bucket_arn: The ARN of the table bucket. (templated)
+ :param namespace: The current namespace of the table. (templated)
+ :param table_name: The current name of the table. (templated)
+ :param new_name: The new name for the table. (templated)
+ :param new_namespace_name: Optional new namespace to move the table to.
(templated)
+ :param version_token: Optional version token for optimistic concurrency.
(templated)
+ """
+
+ template_fields: Sequence[str] = aws_template_fields(
+ "table_bucket_arn", "namespace", "table_name", "new_name",
"new_namespace_name", "version_token"
+ )
+ aws_hook_class = S3TablesHook
+
+ def __init__(
+ self,
+ *,
+ table_bucket_arn: str,
+ namespace: str,
+ table_name: str,
+ new_name: str,
+ new_namespace_name: str | None = None,
+ version_token: str | None = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.table_bucket_arn = table_bucket_arn
+ self.namespace = namespace
+ self.table_name = table_name
+ self.new_name = new_name
+ self.new_namespace_name = new_namespace_name
+ self.version_token = version_token
+
+ def execute(self, context: Context) -> None:
+ self.log.info("Renaming table %s to %s", self.table_name,
self.new_name)
+ kwargs: dict[str, Any] = prune_dict(
+ {
+ "tableBucketARN": self.table_bucket_arn,
+ "namespace": self.namespace,
+ "name": self.table_name,
+ "newName": self.new_name,
+ "newNamespaceName": self.new_namespace_name,
+ "versionToken": self.version_token,
+ }
+ )
+ self.hook.conn.rename_table(**kwargs)
+ self.log.info("Renamed table %s to %s", self.table_name, self.new_name)
diff --git a/providers/amazon/tests/system/amazon/aws/example_s3_tables.py
b/providers/amazon/tests/system/amazon/aws/example_s3_tables.py
index d02705cc6ce..433e50e2fa0 100644
--- a/providers/amazon/tests/system/amazon/aws/example_s3_tables.py
+++ b/providers/amazon/tests/system/amazon/aws/example_s3_tables.py
@@ -25,6 +25,7 @@ from airflow.providers.amazon.aws.operators.s3_tables import (
S3TablesDeleteNamespaceOperator,
S3TablesDeleteTableBucketOperator,
S3TablesDeleteTableOperator,
+ S3TablesRenameTableOperator,
)
from airflow.providers.common.compat.sdk import DAG, chain
@@ -65,6 +66,7 @@ with DAG(
bucket_name = f"{env_id}-s3tables"
namespace = f"{env_id}_ns"
table_name = f"{env_id}_tbl"
+ renamed_table_name = f"{env_id}_tbl_renamed"
@task
def create_namespace(table_bucket_arn: str, namespace: str):
@@ -97,12 +99,22 @@ with DAG(
)
# [END howto_operator_s3tables_create_table]
+ # [START howto_operator_s3tables_rename_table]
+ rename_table = S3TablesRenameTableOperator(
+ task_id="rename_table",
+ table_bucket_arn=create_table_bucket.output,
+ namespace=namespace,
+ table_name=table_name,
+ new_name=renamed_table_name,
+ )
+ # [END howto_operator_s3tables_rename_table]
+
# [START howto_operator_s3tables_delete_table]
delete_table = S3TablesDeleteTableOperator(
task_id="delete_table",
table_bucket_arn=create_table_bucket.output,
namespace=namespace,
- table_name=table_name,
+ table_name=renamed_table_name,
trigger_rule=TriggerRule.ALL_DONE,
)
# [END howto_operator_s3tables_delete_table]
@@ -131,6 +143,7 @@ with DAG(
setup_namespace,
# TEST BODY
create_table,
+ rename_table,
# TEST TEARDOWN
delete_table,
delete_namespace,
diff --git a/providers/amazon/tests/unit/amazon/aws/operators/test_s3_tables.py
b/providers/amazon/tests/unit/amazon/aws/operators/test_s3_tables.py
index 7943e69f906..47a51cec4ab 100644
--- a/providers/amazon/tests/unit/amazon/aws/operators/test_s3_tables.py
+++ b/providers/amazon/tests/unit/amazon/aws/operators/test_s3_tables.py
@@ -31,6 +31,7 @@ from airflow.providers.amazon.aws.operators.s3_tables import (
S3TablesDeleteNamespaceOperator,
S3TablesDeleteTableBucketOperator,
S3TablesDeleteTableOperator,
+ S3TablesRenameTableOperator,
)
from unit.amazon.aws.utils.test_template_fields import validate_template_fields
@@ -90,6 +91,31 @@ class TestS3TablesCreateTableOperator:
metadata=metadata,
)
+ @mock.patch.object(S3TablesHook, "conn", new_callable=mock.PropertyMock)
+ def test_execute_with_optional_args(self, mock_conn):
+ mock_client = mock.MagicMock()
+ mock_conn.return_value = mock_client
+
+ op = S3TablesRenameTableOperator(
+ task_id="rename_table_full",
+ table_bucket_arn=TABLE_BUCKET_ARN,
+ namespace=NAMESPACE,
+ table_name=TABLE_NAME,
+ new_name="new_table",
+ new_namespace_name="new_ns",
+ version_token="token123",
+ )
+ op.execute({})
+
+ mock_client.rename_table.assert_called_once_with(
+ tableBucketARN=TABLE_BUCKET_ARN,
+ namespace=NAMESPACE,
+ name=TABLE_NAME,
+ newName="new_table",
+ newNamespaceName="new_ns",
+ versionToken="token123",
+ )
+
def test_template_fields(self):
validate_template_fields(self.operator)
@@ -313,3 +339,56 @@ class TestS3TablesDeleteNamespaceOperator:
def test_template_fields(self):
validate_template_fields(self.operator)
+
+
+class TestS3TablesRenameTableOperator:
+ def setup_method(self):
+ self.operator = S3TablesRenameTableOperator(
+ task_id="rename_table",
+ table_bucket_arn=TABLE_BUCKET_ARN,
+ namespace=NAMESPACE,
+ table_name=TABLE_NAME,
+ new_name="new_table",
+ )
+
+ @mock.patch.object(S3TablesHook, "conn", new_callable=mock.PropertyMock)
+ def test_execute(self, mock_conn):
+ mock_client = mock.MagicMock()
+ mock_conn.return_value = mock_client
+
+ self.operator.execute({})
+
+ mock_client.rename_table.assert_called_once_with(
+ tableBucketARN=TABLE_BUCKET_ARN,
+ namespace=NAMESPACE,
+ name=TABLE_NAME,
+ newName="new_table",
+ )
+
+ @mock.patch.object(S3TablesHook, "conn", new_callable=mock.PropertyMock)
+ def test_execute_with_optional_args(self, mock_conn):
+ mock_client = mock.MagicMock()
+ mock_conn.return_value = mock_client
+
+ op = S3TablesRenameTableOperator(
+ task_id="rename_table_full",
+ table_bucket_arn=TABLE_BUCKET_ARN,
+ namespace=NAMESPACE,
+ table_name=TABLE_NAME,
+ new_name="new_table",
+ new_namespace_name="new_ns",
+ version_token="token123",
+ )
+ op.execute({})
+
+ mock_client.rename_table.assert_called_once_with(
+ tableBucketARN=TABLE_BUCKET_ARN,
+ namespace=NAMESPACE,
+ name=TABLE_NAME,
+ newName="new_table",
+ newNamespaceName="new_ns",
+ versionToken="token123",
+ )
+
+ def test_template_fields(self):
+ validate_template_fields(self.operator)