This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e838a5a9d91 Add `MwaaServerlessStopWorkflowRunOperator` (#66548)
e838a5a9d91 is described below

commit e838a5a9d91facc90b375b39d7d509876c7f4d0d
Author: John Jackson <[email protected]>
AuthorDate: Fri May 8 10:10:49 2026 -0700

    Add `MwaaServerlessStopWorkflowRunOperator` (#66548)
---
 .../amazon/docs/operators/mwaa_serverless.rst      | 14 ++++++++
 .../amazon/aws/operators/mwaa_serverless.py        | 38 ++++++++++++++++++++
 .../system/amazon/aws/example_mwaa_serverless.py   | 26 +++++++++-----
 .../amazon/aws/operators/test_mwaa_serverless.py   | 41 ++++++++++++++++++++++
 4 files changed, 110 insertions(+), 9 deletions(-)

diff --git a/providers/amazon/docs/operators/mwaa_serverless.rst 
b/providers/amazon/docs/operators/mwaa_serverless.rst
index 86906990c59..fdfd6ba45e5 100644
--- a/providers/amazon/docs/operators/mwaa_serverless.rst
+++ b/providers/amazon/docs/operators/mwaa_serverless.rst
@@ -68,3 +68,17 @@ To wait for an Amazon MWAA Serverless workflow run to 
complete, use
     :dedent: 4
     :start-after: [START howto_sensor_mwaa_serverless_workflow_run]
     :end-before: [END howto_sensor_mwaa_serverless_workflow_run]
+
+.. _howto/operator:MwaaServerlessStopWorkflowRunOperator:
+
+Stop a Workflow Run
+-------------------
+
+To stop a running Amazon MWAA Serverless workflow run, use
+:class:`~airflow.providers.amazon.aws.operators.mwaa_serverless.MwaaServerlessStopWorkflowRunOperator`.
+
+.. exampleinclude:: 
/../../amazon/tests/system/amazon/aws/example_mwaa_serverless.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_mwaa_serverless_stop_workflow_run]
+    :end-before: [END howto_operator_mwaa_serverless_stop_workflow_run]
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/mwaa_serverless.py
 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/mwaa_serverless.py
index 41633de37e1..f76a3354228 100644
--- 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/mwaa_serverless.py
+++ 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/mwaa_serverless.py
@@ -157,3 +157,41 @@ class 
MwaaServerlessCreateWorkflowOperator(AwsBaseOperator[AwsBaseHook]):
                 raise
         self.log.info("Workflow %s: %s", self.workflow_name, workflow_arn)
         return workflow_arn
+
+
+class MwaaServerlessStopWorkflowRunOperator(AwsBaseOperator[AwsBaseHook]):
+    """
+    Stop a running Amazon MWAA Serverless workflow run.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:MwaaServerlessStopWorkflowRunOperator`
+
+    :param workflow_arn: The ARN of the workflow. (templated)
+    :param run_id: The ID of the run to stop. (templated)
+    """
+
+    aws_hook_class = AwsBaseHook
+    template_fields: tuple[str, ...] = aws_template_fields("workflow_arn", 
"run_id")
+
+    def __init__(
+        self,
+        *,
+        workflow_arn: str,
+        run_id: str,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.workflow_arn = workflow_arn
+        self.run_id = run_id
+
+    @property
+    def _hook_parameters(self) -> dict[str, Any]:
+        return {**super()._hook_parameters, "client_type": "mwaa-serverless"}
+
+    def execute(self, context: Context) -> str:
+        self.log.info("Stopping workflow run %s", self.run_id)
+        response = 
self.hook.conn.stop_workflow_run(WorkflowArn=self.workflow_arn, 
RunId=self.run_id)
+        status = response["Status"]
+        self.log.info("Workflow run %s status: %s", self.run_id, status)
+        return status
diff --git 
a/providers/amazon/tests/system/amazon/aws/example_mwaa_serverless.py 
b/providers/amazon/tests/system/amazon/aws/example_mwaa_serverless.py
index 18ea3e8ddf9..9453fcbf8a2 100644
--- a/providers/amazon/tests/system/amazon/aws/example_mwaa_serverless.py
+++ b/providers/amazon/tests/system/amazon/aws/example_mwaa_serverless.py
@@ -21,6 +21,7 @@ from datetime import datetime
 from airflow.providers.amazon.aws.operators.mwaa_serverless import (
     MwaaServerlessCreateWorkflowOperator,
     MwaaServerlessStartWorkflowRunOperator,
+    MwaaServerlessStopWorkflowRunOperator,
 )
 from airflow.providers.amazon.aws.operators.s3 import (
     S3CreateBucketOperator,
@@ -60,14 +61,6 @@ systest_mwaa_serverless:
 sys_test_context_task = 
SystemTestContextBuilder().add_variable(ROLE_ARN_KEY).build()
 
 
-@task(trigger_rule=TriggerRule.ALL_DONE)
-def stop_workflow_run(workflow_arn: str, run_id: str):
-    """Stop the workflow run."""
-    import boto3
-
-    
boto3.client("mwaa-serverless").stop_workflow_run(WorkflowArn=workflow_arn, 
RunId=run_id)
-
-
 @task(trigger_rule=TriggerRule.ALL_DONE)
 def delete_workflow(workflow_arn: str):
     """Delete the MWAA Serverless workflow."""
@@ -124,6 +117,20 @@ with DAG(
     )
     # [END howto_sensor_mwaa_serverless_workflow_run]
 
+    # Start a second run to test stopping
+    start_workflow_2 = MwaaServerlessStartWorkflowRunOperator(
+        task_id="start_workflow_2",
+        workflow_arn=workflow_arn,
+    )
+
+    # [START howto_operator_mwaa_serverless_stop_workflow_run]
+    stop_workflow_run = MwaaServerlessStopWorkflowRunOperator(
+        task_id="stop_workflow_run",
+        workflow_arn=workflow_arn,
+        run_id=start_workflow_2.output,
+    )
+    # [END howto_operator_mwaa_serverless_stop_workflow_run]
+
     delete_bucket = S3DeleteBucketOperator(
         task_id="delete_bucket",
         bucket_name=bucket_name,
@@ -138,7 +145,8 @@ with DAG(
         workflow_arn,
         start_workflow,
         wait_for_run,
-        stop_workflow_run(workflow_arn=workflow_arn, 
run_id=start_workflow.output),
+        start_workflow_2,
+        stop_workflow_run,
         delete_workflow(workflow_arn=workflow_arn),
         delete_bucket,
     )
diff --git 
a/providers/amazon/tests/unit/amazon/aws/operators/test_mwaa_serverless.py 
b/providers/amazon/tests/unit/amazon/aws/operators/test_mwaa_serverless.py
index b7535f825d3..2bc25674cb4 100644
--- a/providers/amazon/tests/unit/amazon/aws/operators/test_mwaa_serverless.py
+++ b/providers/amazon/tests/unit/amazon/aws/operators/test_mwaa_serverless.py
@@ -25,6 +25,7 @@ from airflow.providers.amazon.aws.hooks.base_aws import 
AwsBaseHook
 from airflow.providers.amazon.aws.operators.mwaa_serverless import (
     MwaaServerlessCreateWorkflowOperator,
     MwaaServerlessStartWorkflowRunOperator,
+    MwaaServerlessStopWorkflowRunOperator,
 )
 
 from unit.amazon.aws.utils.test_template_fields import validate_template_fields
@@ -146,3 +147,43 @@ class TestMwaaServerlessCreateWorkflowOperator:
 
     def test_template_fields(self):
         validate_template_fields(self.operator)
+
+
+class TestMwaaServerlessStopWorkflowRunOperator:
+    def setup_method(self):
+        self.operator = MwaaServerlessStopWorkflowRunOperator(
+            task_id="stop_run",
+            workflow_arn=WORKFLOW_ARN,
+            run_id=RUN_ID,
+        )
+
+    @mock.patch.object(AwsBaseHook, "conn", new_callable=mock.PropertyMock)
+    def test_execute(self, mock_conn):
+        mock_client = mock.MagicMock()
+        mock_client.stop_workflow_run.return_value = {
+            "WorkflowArn": WORKFLOW_ARN,
+            "RunId": RUN_ID,
+            "Status": "STOPPING",
+        }
+        mock_conn.return_value = mock_client
+
+        result = self.operator.execute({})
+
+        
mock_client.stop_workflow_run.assert_called_once_with(WorkflowArn=WORKFLOW_ARN, 
RunId=RUN_ID)
+        assert result == "STOPPING"
+
+    @mock.patch.object(AwsBaseHook, "conn", new_callable=mock.PropertyMock)
+    def test_execute_not_found(self, mock_conn):
+        from botocore.exceptions import ClientError
+
+        mock_client = mock.MagicMock()
+        mock_client.stop_workflow_run.side_effect = ClientError(
+            {"Error": {"Code": "ResourceNotFoundException", "Message": "not 
found"}}, "StopWorkflowRun"
+        )
+        mock_conn.return_value = mock_client
+
+        with pytest.raises(ClientError):
+            self.operator.execute({})
+
+    def test_template_fields(self):
+        validate_template_fields(self.operator)

Reply via email to