This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e838a5a9d91 Add `MwaaServerlessStopWorkflowRunOperator` (#66548)
e838a5a9d91 is described below
commit e838a5a9d91facc90b375b39d7d509876c7f4d0d
Author: John Jackson <[email protected]>
AuthorDate: Fri May 8 10:10:49 2026 -0700
Add `MwaaServerlessStopWorkflowRunOperator` (#66548)
---
.../amazon/docs/operators/mwaa_serverless.rst | 14 ++++++++
.../amazon/aws/operators/mwaa_serverless.py | 38 ++++++++++++++++++++
.../system/amazon/aws/example_mwaa_serverless.py | 26 +++++++++-----
.../amazon/aws/operators/test_mwaa_serverless.py | 41 ++++++++++++++++++++++
4 files changed, 110 insertions(+), 9 deletions(-)
diff --git a/providers/amazon/docs/operators/mwaa_serverless.rst
b/providers/amazon/docs/operators/mwaa_serverless.rst
index 86906990c59..fdfd6ba45e5 100644
--- a/providers/amazon/docs/operators/mwaa_serverless.rst
+++ b/providers/amazon/docs/operators/mwaa_serverless.rst
@@ -68,3 +68,17 @@ To wait for an Amazon MWAA Serverless workflow run to
complete, use
:dedent: 4
:start-after: [START howto_sensor_mwaa_serverless_workflow_run]
:end-before: [END howto_sensor_mwaa_serverless_workflow_run]
+
+.. _howto/operator:MwaaServerlessStopWorkflowRunOperator:
+
+Stop a Workflow Run
+-------------------
+
+To stop a running Amazon MWAA Serverless workflow run, use
+:class:`~airflow.providers.amazon.aws.operators.mwaa_serverless.MwaaServerlessStopWorkflowRunOperator`.
+
+.. exampleinclude::
/../../amazon/tests/system/amazon/aws/example_mwaa_serverless.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_mwaa_serverless_stop_workflow_run]
+ :end-before: [END howto_operator_mwaa_serverless_stop_workflow_run]
diff --git
a/providers/amazon/src/airflow/providers/amazon/aws/operators/mwaa_serverless.py
b/providers/amazon/src/airflow/providers/amazon/aws/operators/mwaa_serverless.py
index 41633de37e1..f76a3354228 100644
---
a/providers/amazon/src/airflow/providers/amazon/aws/operators/mwaa_serverless.py
+++
b/providers/amazon/src/airflow/providers/amazon/aws/operators/mwaa_serverless.py
@@ -157,3 +157,41 @@ class
MwaaServerlessCreateWorkflowOperator(AwsBaseOperator[AwsBaseHook]):
raise
self.log.info("Workflow %s: %s", self.workflow_name, workflow_arn)
return workflow_arn
+
+
+class MwaaServerlessStopWorkflowRunOperator(AwsBaseOperator[AwsBaseHook]):
+ """
+ Stop a running Amazon MWAA Serverless workflow run.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:MwaaServerlessStopWorkflowRunOperator`
+
+ :param workflow_arn: The ARN of the workflow. (templated)
+ :param run_id: The ID of the run to stop. (templated)
+ """
+
+ aws_hook_class = AwsBaseHook
+ template_fields: tuple[str, ...] = aws_template_fields("workflow_arn",
"run_id")
+
+ def __init__(
+ self,
+ *,
+ workflow_arn: str,
+ run_id: str,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.workflow_arn = workflow_arn
+ self.run_id = run_id
+
+ @property
+ def _hook_parameters(self) -> dict[str, Any]:
+ return {**super()._hook_parameters, "client_type": "mwaa-serverless"}
+
+ def execute(self, context: Context) -> str:
+ self.log.info("Stopping workflow run %s", self.run_id)
+ response =
self.hook.conn.stop_workflow_run(WorkflowArn=self.workflow_arn,
RunId=self.run_id)
+ status = response["Status"]
+ self.log.info("Workflow run %s status: %s", self.run_id, status)
+ return status
diff --git
a/providers/amazon/tests/system/amazon/aws/example_mwaa_serverless.py
b/providers/amazon/tests/system/amazon/aws/example_mwaa_serverless.py
index 18ea3e8ddf9..9453fcbf8a2 100644
--- a/providers/amazon/tests/system/amazon/aws/example_mwaa_serverless.py
+++ b/providers/amazon/tests/system/amazon/aws/example_mwaa_serverless.py
@@ -21,6 +21,7 @@ from datetime import datetime
from airflow.providers.amazon.aws.operators.mwaa_serverless import (
MwaaServerlessCreateWorkflowOperator,
MwaaServerlessStartWorkflowRunOperator,
+ MwaaServerlessStopWorkflowRunOperator,
)
from airflow.providers.amazon.aws.operators.s3 import (
S3CreateBucketOperator,
@@ -60,14 +61,6 @@ systest_mwaa_serverless:
sys_test_context_task =
SystemTestContextBuilder().add_variable(ROLE_ARN_KEY).build()
-@task(trigger_rule=TriggerRule.ALL_DONE)
-def stop_workflow_run(workflow_arn: str, run_id: str):
- """Stop the workflow run."""
- import boto3
-
-
boto3.client("mwaa-serverless").stop_workflow_run(WorkflowArn=workflow_arn,
RunId=run_id)
-
-
@task(trigger_rule=TriggerRule.ALL_DONE)
def delete_workflow(workflow_arn: str):
"""Delete the MWAA Serverless workflow."""
@@ -124,6 +117,20 @@ with DAG(
)
# [END howto_sensor_mwaa_serverless_workflow_run]
+ # Start a second run to test stopping
+ start_workflow_2 = MwaaServerlessStartWorkflowRunOperator(
+ task_id="start_workflow_2",
+ workflow_arn=workflow_arn,
+ )
+
+ # [START howto_operator_mwaa_serverless_stop_workflow_run]
+ stop_workflow_run = MwaaServerlessStopWorkflowRunOperator(
+ task_id="stop_workflow_run",
+ workflow_arn=workflow_arn,
+ run_id=start_workflow_2.output,
+ )
+ # [END howto_operator_mwaa_serverless_stop_workflow_run]
+
delete_bucket = S3DeleteBucketOperator(
task_id="delete_bucket",
bucket_name=bucket_name,
@@ -138,7 +145,8 @@ with DAG(
workflow_arn,
start_workflow,
wait_for_run,
- stop_workflow_run(workflow_arn=workflow_arn,
run_id=start_workflow.output),
+ start_workflow_2,
+ stop_workflow_run,
delete_workflow(workflow_arn=workflow_arn),
delete_bucket,
)
diff --git
a/providers/amazon/tests/unit/amazon/aws/operators/test_mwaa_serverless.py
b/providers/amazon/tests/unit/amazon/aws/operators/test_mwaa_serverless.py
index b7535f825d3..2bc25674cb4 100644
--- a/providers/amazon/tests/unit/amazon/aws/operators/test_mwaa_serverless.py
+++ b/providers/amazon/tests/unit/amazon/aws/operators/test_mwaa_serverless.py
@@ -25,6 +25,7 @@ from airflow.providers.amazon.aws.hooks.base_aws import
AwsBaseHook
from airflow.providers.amazon.aws.operators.mwaa_serverless import (
MwaaServerlessCreateWorkflowOperator,
MwaaServerlessStartWorkflowRunOperator,
+ MwaaServerlessStopWorkflowRunOperator,
)
from unit.amazon.aws.utils.test_template_fields import validate_template_fields
@@ -146,3 +147,43 @@ class TestMwaaServerlessCreateWorkflowOperator:
def test_template_fields(self):
validate_template_fields(self.operator)
+
+
+class TestMwaaServerlessStopWorkflowRunOperator:
+ def setup_method(self):
+ self.operator = MwaaServerlessStopWorkflowRunOperator(
+ task_id="stop_run",
+ workflow_arn=WORKFLOW_ARN,
+ run_id=RUN_ID,
+ )
+
+ @mock.patch.object(AwsBaseHook, "conn", new_callable=mock.PropertyMock)
+ def test_execute(self, mock_conn):
+ mock_client = mock.MagicMock()
+ mock_client.stop_workflow_run.return_value = {
+ "WorkflowArn": WORKFLOW_ARN,
+ "RunId": RUN_ID,
+ "Status": "STOPPING",
+ }
+ mock_conn.return_value = mock_client
+
+ result = self.operator.execute({})
+
+
mock_client.stop_workflow_run.assert_called_once_with(WorkflowArn=WORKFLOW_ARN,
RunId=RUN_ID)
+ assert result == "STOPPING"
+
+ @mock.patch.object(AwsBaseHook, "conn", new_callable=mock.PropertyMock)
+ def test_execute_not_found(self, mock_conn):
+ from botocore.exceptions import ClientError
+
+ mock_client = mock.MagicMock()
+ mock_client.stop_workflow_run.side_effect = ClientError(
+ {"Error": {"Code": "ResourceNotFoundException", "Message": "not
found"}}, "StopWorkflowRun"
+ )
+ mock_conn.return_value = mock_client
+
+ with pytest.raises(ClientError):
+ self.operator.execute({})
+
+ def test_template_fields(self):
+ validate_template_fields(self.operator)