This is an automated email from the ASF dual-hosted git repository.
o-nikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 25b96669630 Add MwaaServerlessDeleteWorkflowOperator (#66891)
25b96669630 is described below
commit 25b96669630f9d8503d7bd3609bc3a0c1a6544a7
Author: Karthik Seshadri <[email protected]>
AuthorDate: Thu May 14 17:10:25 2026 -0700
Add MwaaServerlessDeleteWorkflowOperator (#66891)
---
.../amazon/docs/operators/mwaa_serverless.rst | 14 ++++++
.../amazon/aws/operators/mwaa_serverless.py | 43 +++++++++++++++++
.../system/amazon/aws/example_mwaa_serverless.py | 25 +++++-----
.../amazon/aws/operators/test_mwaa_serverless.py | 56 ++++++++++++++++++++++
4 files changed, 127 insertions(+), 11 deletions(-)
diff --git a/providers/amazon/docs/operators/mwaa_serverless.rst
b/providers/amazon/docs/operators/mwaa_serverless.rst
index b64eb8b0b4f..ba2810a5267 100644
--- a/providers/amazon/docs/operators/mwaa_serverless.rst
+++ b/providers/amazon/docs/operators/mwaa_serverless.rst
@@ -96,3 +96,17 @@ To stop a running Amazon MWAA Serverless workflow run, use
:dedent: 4
:start-after: [START howto_operator_mwaa_serverless_stop_workflow_run]
:end-before: [END howto_operator_mwaa_serverless_stop_workflow_run]
+
+.. _howto/operator:MwaaServerlessDeleteWorkflowOperator:
+
+Delete a Workflow
+-----------------
+
+To delete an Amazon MWAA Serverless workflow, use
+:class:`~airflow.providers.amazon.aws.operators.mwaa_serverless.MwaaServerlessDeleteWorkflowOperator`.
+
+.. exampleinclude::
/../../amazon/tests/system/amazon/aws/example_mwaa_serverless.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_mwaa_serverless_delete_workflow]
+ :end-before: [END howto_operator_mwaa_serverless_delete_workflow]
diff --git
a/providers/amazon/src/airflow/providers/amazon/aws/operators/mwaa_serverless.py
b/providers/amazon/src/airflow/providers/amazon/aws/operators/mwaa_serverless.py
index 25b3b038634..cd629505bed 100644
---
a/providers/amazon/src/airflow/providers/amazon/aws/operators/mwaa_serverless.py
+++
b/providers/amazon/src/airflow/providers/amazon/aws/operators/mwaa_serverless.py
@@ -215,6 +215,49 @@ class
MwaaServerlessUpdateWorkflowOperator(AwsBaseOperator[AwsBaseHook]):
return workflow_arn
+class MwaaServerlessDeleteWorkflowOperator(AwsBaseOperator[AwsBaseHook]):
+ """
+ Delete an Amazon MWAA Serverless workflow.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:MwaaServerlessDeleteWorkflowOperator`
+
+ :param workflow_arn: The ARN of the workflow to delete. (templated)
+ :param workflow_version: Optional specific version to delete. If not
specified,
+ all versions are deleted. (templated)
+ """
+
+ aws_hook_class = AwsBaseHook
+ template_fields: tuple[str, ...] = aws_template_fields("workflow_arn",
"workflow_version")
+
+ def __init__(
+ self,
+ *,
+ workflow_arn: str,
+ workflow_version: str | None = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.workflow_arn = workflow_arn
+ self.workflow_version = workflow_version
+
+ @property
+ def _hook_parameters(self) -> dict[str, Any]:
+ return {**super()._hook_parameters, "client_type": "mwaa-serverless"}
+
+ def execute(self, context: Context) -> None:
+ self.log.info("Deleting MWAA Serverless workflow %s",
self.workflow_arn)
+ kwargs: dict[str, Any] = prune_dict(
+ {
+ "WorkflowArn": self.workflow_arn,
+ "WorkflowVersion": self.workflow_version,
+ }
+ )
+ self.hook.conn.delete_workflow(**kwargs)
+ self.log.info("Workflow %s deleted.", self.workflow_arn)
+
+
class MwaaServerlessStopWorkflowRunOperator(AwsBaseOperator[AwsBaseHook]):
"""
Stop a running Amazon MWAA Serverless workflow run.
diff --git
a/providers/amazon/tests/system/amazon/aws/example_mwaa_serverless.py
b/providers/amazon/tests/system/amazon/aws/example_mwaa_serverless.py
index 52736f0b87a..d1d6f6bdcb1 100644
--- a/providers/amazon/tests/system/amazon/aws/example_mwaa_serverless.py
+++ b/providers/amazon/tests/system/amazon/aws/example_mwaa_serverless.py
@@ -20,6 +20,7 @@ from datetime import datetime
from airflow.providers.amazon.aws.operators.mwaa_serverless import (
MwaaServerlessCreateWorkflowOperator,
+ MwaaServerlessDeleteWorkflowOperator,
MwaaServerlessStartWorkflowRunOperator,
MwaaServerlessStopWorkflowRunOperator,
MwaaServerlessUpdateWorkflowOperator,
@@ -36,9 +37,8 @@ from system.amazon.aws.utils import ENV_ID_KEY,
SystemTestContextBuilder
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
if AIRFLOW_V_3_0_PLUS:
- from airflow.sdk import TriggerRule, task
+ from airflow.sdk import TriggerRule
else:
- from airflow.decorators import task # type: ignore[attr-defined,no-redef]
from airflow.utils.trigger_rule import TriggerRule # type:
ignore[no-redef,attr-defined]
DAG_ID = "example_mwaa_serverless"
@@ -62,14 +62,6 @@ systest_mwaa_serverless:
sys_test_context_task =
SystemTestContextBuilder().add_variable(ROLE_ARN_KEY).build()
-@task(trigger_rule=TriggerRule.ALL_DONE)
-def delete_workflow(workflow_arn: str):
- """Delete the MWAA Serverless workflow."""
- import boto3
-
- boto3.client("mwaa-serverless").delete_workflow(WorkflowArn=workflow_arn)
-
-
with DAG(
dag_id=DAG_ID,
schedule=None,
@@ -142,6 +134,14 @@ with DAG(
)
# [END howto_operator_mwaa_serverless_stop_workflow_run]
+ # [START howto_operator_mwaa_serverless_delete_workflow]
+ delete_workflow = MwaaServerlessDeleteWorkflowOperator(
+ task_id="delete_workflow",
+ workflow_arn=workflow_arn,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+ # [END howto_operator_mwaa_serverless_delete_workflow]
+
delete_bucket = S3DeleteBucketOperator(
task_id="delete_bucket",
bucket_name=bucket_name,
@@ -150,16 +150,19 @@ with DAG(
)
chain(
+ # TEST SETUP
test_context,
create_bucket,
upload_workflow_yaml,
workflow_arn,
+ # TEST BODY
start_workflow,
wait_for_run,
update_workflow,
start_workflow_2,
stop_workflow_run,
- delete_workflow(workflow_arn=workflow_arn),
+ # TEST TEARDOWN
+ delete_workflow,
delete_bucket,
)
diff --git
a/providers/amazon/tests/unit/amazon/aws/operators/test_mwaa_serverless.py
b/providers/amazon/tests/unit/amazon/aws/operators/test_mwaa_serverless.py
index 33f98abab1b..69397ebd15c 100644
--- a/providers/amazon/tests/unit/amazon/aws/operators/test_mwaa_serverless.py
+++ b/providers/amazon/tests/unit/amazon/aws/operators/test_mwaa_serverless.py
@@ -24,6 +24,7 @@ from botocore.exceptions import ClientError
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from airflow.providers.amazon.aws.operators.mwaa_serverless import (
MwaaServerlessCreateWorkflowOperator,
+ MwaaServerlessDeleteWorkflowOperator,
MwaaServerlessStartWorkflowRunOperator,
MwaaServerlessStopWorkflowRunOperator,
MwaaServerlessUpdateWorkflowOperator,
@@ -220,6 +221,61 @@ class TestMwaaServerlessUpdateWorkflowOperator:
validate_template_fields(self.operator)
+class TestMwaaServerlessDeleteWorkflowOperator:
+ def setup_method(self):
+ self.operator = MwaaServerlessDeleteWorkflowOperator(
+ task_id="delete_workflow",
+ workflow_arn=WORKFLOW_ARN,
+ )
+
+ @mock.patch.object(AwsBaseHook, "conn", new_callable=mock.PropertyMock)
+ def test_execute(self, mock_conn):
+ mock_client = mock.MagicMock()
+ mock_client.delete_workflow.return_value = {"WorkflowArn":
WORKFLOW_ARN}
+ mock_conn.return_value = mock_client
+
+ result = self.operator.execute({})
+
+
mock_client.delete_workflow.assert_called_once_with(WorkflowArn=WORKFLOW_ARN)
+ assert result is None
+
+ @mock.patch.object(AwsBaseHook, "conn", new_callable=mock.PropertyMock)
+ def test_execute_with_version(self, mock_conn):
+ op = MwaaServerlessDeleteWorkflowOperator(
+ task_id="delete_workflow",
+ workflow_arn=WORKFLOW_ARN,
+ workflow_version="abc123def456abc123def456abc123de",
+ )
+ mock_client = mock.MagicMock()
+ mock_client.delete_workflow.return_value = {
+ "WorkflowArn": WORKFLOW_ARN,
+ "WorkflowVersion": "abc123def456abc123def456abc123de",
+ }
+ mock_conn.return_value = mock_client
+
+ result = op.execute({})
+
+ mock_client.delete_workflow.assert_called_once_with(
+ WorkflowArn=WORKFLOW_ARN,
+ WorkflowVersion="abc123def456abc123def456abc123de",
+ )
+ assert result is None
+
+ @mock.patch.object(AwsBaseHook, "conn", new_callable=mock.PropertyMock)
+ def test_execute_not_found(self, mock_conn):
+ mock_client = mock.MagicMock()
+ mock_client.delete_workflow.side_effect = ClientError(
+ {"Error": {"Code": "ResourceNotFoundException", "Message": "not
found"}}, "DeleteWorkflow"
+ )
+ mock_conn.return_value = mock_client
+
+ with pytest.raises(ClientError, match="ResourceNotFoundException"):
+ self.operator.execute({})
+
+ def test_template_fields(self):
+ validate_template_fields(self.operator)
+
+
class TestMwaaServerlessStopWorkflowRunOperator:
def setup_method(self):
self.operator = MwaaServerlessStopWorkflowRunOperator(