This is an automated email from the ASF dual-hosted git repository.

o-nikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 25b96669630 Add MwaaServerlessDeleteWorkflowOperator (#66891)
25b96669630 is described below

commit 25b96669630f9d8503d7bd3609bc3a0c1a6544a7
Author: Karthik Seshadri <[email protected]>
AuthorDate: Thu May 14 17:10:25 2026 -0700

    Add MwaaServerlessDeleteWorkflowOperator (#66891)
---
 .../amazon/docs/operators/mwaa_serverless.rst      | 14 ++++++
 .../amazon/aws/operators/mwaa_serverless.py        | 43 +++++++++++++++++
 .../system/amazon/aws/example_mwaa_serverless.py   | 25 +++++-----
 .../amazon/aws/operators/test_mwaa_serverless.py   | 56 ++++++++++++++++++++++
 4 files changed, 127 insertions(+), 11 deletions(-)

diff --git a/providers/amazon/docs/operators/mwaa_serverless.rst 
b/providers/amazon/docs/operators/mwaa_serverless.rst
index b64eb8b0b4f..ba2810a5267 100644
--- a/providers/amazon/docs/operators/mwaa_serverless.rst
+++ b/providers/amazon/docs/operators/mwaa_serverless.rst
@@ -96,3 +96,17 @@ To stop a running Amazon MWAA Serverless workflow run, use
     :dedent: 4
     :start-after: [START howto_operator_mwaa_serverless_stop_workflow_run]
     :end-before: [END howto_operator_mwaa_serverless_stop_workflow_run]
+
+.. _howto/operator:MwaaServerlessDeleteWorkflowOperator:
+
+Delete a Workflow
+-----------------
+
+To delete an Amazon MWAA Serverless workflow, use
+:class:`~airflow.providers.amazon.aws.operators.mwaa_serverless.MwaaServerlessDeleteWorkflowOperator`.
+
+.. exampleinclude:: 
/../../amazon/tests/system/amazon/aws/example_mwaa_serverless.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_mwaa_serverless_delete_workflow]
+    :end-before: [END howto_operator_mwaa_serverless_delete_workflow]
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/mwaa_serverless.py
 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/mwaa_serverless.py
index 25b3b038634..cd629505bed 100644
--- 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/mwaa_serverless.py
+++ 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/mwaa_serverless.py
@@ -215,6 +215,49 @@ class 
MwaaServerlessUpdateWorkflowOperator(AwsBaseOperator[AwsBaseHook]):
         return workflow_arn
 
 
+class MwaaServerlessDeleteWorkflowOperator(AwsBaseOperator[AwsBaseHook]):
+    """
+    Delete an Amazon MWAA Serverless workflow.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:MwaaServerlessDeleteWorkflowOperator`
+
+    :param workflow_arn: The ARN of the workflow to delete. (templated)
+    :param workflow_version: Optional specific version to delete. If not 
specified,
+        all versions are deleted. (templated)
+    """
+
+    aws_hook_class = AwsBaseHook
+    template_fields: tuple[str, ...] = aws_template_fields("workflow_arn", 
"workflow_version")
+
+    def __init__(
+        self,
+        *,
+        workflow_arn: str,
+        workflow_version: str | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.workflow_arn = workflow_arn
+        self.workflow_version = workflow_version
+
+    @property
+    def _hook_parameters(self) -> dict[str, Any]:
+        return {**super()._hook_parameters, "client_type": "mwaa-serverless"}
+
+    def execute(self, context: Context) -> None:
+        self.log.info("Deleting MWAA Serverless workflow %s", 
self.workflow_arn)
+        kwargs: dict[str, Any] = prune_dict(
+            {
+                "WorkflowArn": self.workflow_arn,
+                "WorkflowVersion": self.workflow_version,
+            }
+        )
+        self.hook.conn.delete_workflow(**kwargs)
+        self.log.info("Workflow %s deleted.", self.workflow_arn)
+
+
 class MwaaServerlessStopWorkflowRunOperator(AwsBaseOperator[AwsBaseHook]):
     """
     Stop a running Amazon MWAA Serverless workflow run.
diff --git 
a/providers/amazon/tests/system/amazon/aws/example_mwaa_serverless.py 
b/providers/amazon/tests/system/amazon/aws/example_mwaa_serverless.py
index 52736f0b87a..d1d6f6bdcb1 100644
--- a/providers/amazon/tests/system/amazon/aws/example_mwaa_serverless.py
+++ b/providers/amazon/tests/system/amazon/aws/example_mwaa_serverless.py
@@ -20,6 +20,7 @@ from datetime import datetime
 
 from airflow.providers.amazon.aws.operators.mwaa_serverless import (
     MwaaServerlessCreateWorkflowOperator,
+    MwaaServerlessDeleteWorkflowOperator,
     MwaaServerlessStartWorkflowRunOperator,
     MwaaServerlessStopWorkflowRunOperator,
     MwaaServerlessUpdateWorkflowOperator,
@@ -36,9 +37,8 @@ from system.amazon.aws.utils import ENV_ID_KEY, 
SystemTestContextBuilder
 from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
 
 if AIRFLOW_V_3_0_PLUS:
-    from airflow.sdk import TriggerRule, task
+    from airflow.sdk import TriggerRule
 else:
-    from airflow.decorators import task  # type: ignore[attr-defined,no-redef]
     from airflow.utils.trigger_rule import TriggerRule  # type: 
ignore[no-redef,attr-defined]
 
 DAG_ID = "example_mwaa_serverless"
@@ -62,14 +62,6 @@ systest_mwaa_serverless:
 sys_test_context_task = 
SystemTestContextBuilder().add_variable(ROLE_ARN_KEY).build()
 
 
-@task(trigger_rule=TriggerRule.ALL_DONE)
-def delete_workflow(workflow_arn: str):
-    """Delete the MWAA Serverless workflow."""
-    import boto3
-
-    boto3.client("mwaa-serverless").delete_workflow(WorkflowArn=workflow_arn)
-
-
 with DAG(
     dag_id=DAG_ID,
     schedule=None,
@@ -142,6 +134,14 @@ with DAG(
     )
     # [END howto_operator_mwaa_serverless_stop_workflow_run]
 
+    # [START howto_operator_mwaa_serverless_delete_workflow]
+    delete_workflow = MwaaServerlessDeleteWorkflowOperator(
+        task_id="delete_workflow",
+        workflow_arn=workflow_arn,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+    # [END howto_operator_mwaa_serverless_delete_workflow]
+
     delete_bucket = S3DeleteBucketOperator(
         task_id="delete_bucket",
         bucket_name=bucket_name,
@@ -150,16 +150,19 @@ with DAG(
     )
 
     chain(
+        # TEST SETUP
         test_context,
         create_bucket,
         upload_workflow_yaml,
         workflow_arn,
+        # TEST BODY
         start_workflow,
         wait_for_run,
         update_workflow,
         start_workflow_2,
         stop_workflow_run,
-        delete_workflow(workflow_arn=workflow_arn),
+        # TEST TEARDOWN
+        delete_workflow,
         delete_bucket,
     )
 
diff --git 
a/providers/amazon/tests/unit/amazon/aws/operators/test_mwaa_serverless.py 
b/providers/amazon/tests/unit/amazon/aws/operators/test_mwaa_serverless.py
index 33f98abab1b..69397ebd15c 100644
--- a/providers/amazon/tests/unit/amazon/aws/operators/test_mwaa_serverless.py
+++ b/providers/amazon/tests/unit/amazon/aws/operators/test_mwaa_serverless.py
@@ -24,6 +24,7 @@ from botocore.exceptions import ClientError
 from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
 from airflow.providers.amazon.aws.operators.mwaa_serverless import (
     MwaaServerlessCreateWorkflowOperator,
+    MwaaServerlessDeleteWorkflowOperator,
     MwaaServerlessStartWorkflowRunOperator,
     MwaaServerlessStopWorkflowRunOperator,
     MwaaServerlessUpdateWorkflowOperator,
@@ -220,6 +221,61 @@ class TestMwaaServerlessUpdateWorkflowOperator:
         validate_template_fields(self.operator)
 
 
+class TestMwaaServerlessDeleteWorkflowOperator:
+    def setup_method(self):
+        self.operator = MwaaServerlessDeleteWorkflowOperator(
+            task_id="delete_workflow",
+            workflow_arn=WORKFLOW_ARN,
+        )
+
+    @mock.patch.object(AwsBaseHook, "conn", new_callable=mock.PropertyMock)
+    def test_execute(self, mock_conn):
+        mock_client = mock.MagicMock()
+        mock_client.delete_workflow.return_value = {"WorkflowArn": 
WORKFLOW_ARN}
+        mock_conn.return_value = mock_client
+
+        result = self.operator.execute({})
+
+        
mock_client.delete_workflow.assert_called_once_with(WorkflowArn=WORKFLOW_ARN)
+        assert result is None
+
+    @mock.patch.object(AwsBaseHook, "conn", new_callable=mock.PropertyMock)
+    def test_execute_with_version(self, mock_conn):
+        op = MwaaServerlessDeleteWorkflowOperator(
+            task_id="delete_workflow",
+            workflow_arn=WORKFLOW_ARN,
+            workflow_version="abc123def456abc123def456abc123de",
+        )
+        mock_client = mock.MagicMock()
+        mock_client.delete_workflow.return_value = {
+            "WorkflowArn": WORKFLOW_ARN,
+            "WorkflowVersion": "abc123def456abc123def456abc123de",
+        }
+        mock_conn.return_value = mock_client
+
+        result = op.execute({})
+
+        mock_client.delete_workflow.assert_called_once_with(
+            WorkflowArn=WORKFLOW_ARN,
+            WorkflowVersion="abc123def456abc123def456abc123de",
+        )
+        assert result is None
+
+    @mock.patch.object(AwsBaseHook, "conn", new_callable=mock.PropertyMock)
+    def test_execute_not_found(self, mock_conn):
+        mock_client = mock.MagicMock()
+        mock_client.delete_workflow.side_effect = ClientError(
+            {"Error": {"Code": "ResourceNotFoundException", "Message": "not 
found"}}, "DeleteWorkflow"
+        )
+        mock_conn.return_value = mock_client
+
+        with pytest.raises(ClientError, match="ResourceNotFoundException"):
+            self.operator.execute({})
+
+    def test_template_fields(self):
+        validate_template_fields(self.operator)
+
+
 class TestMwaaServerlessStopWorkflowRunOperator:
     def setup_method(self):
         self.operator = MwaaServerlessStopWorkflowRunOperator(

Reply via email to