This is an automated email from the ASF dual-hosted git repository.

o-nikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 09ea5bf9b10 Add `MwaaServerlessUpdateWorkflowOperator` (#66833)
09ea5bf9b10 is described below

commit 09ea5bf9b1017a20c1cc4d26e5c0a6bb09ca0098
Author: Karthik Seshadri <[email protected]>
AuthorDate: Tue May 12 23:28:15 2026 -0700

    Add `MwaaServerlessUpdateWorkflowOperator` (#66833)
    
    Add operator to update existing Amazon MWAA Serverless workflows,
    supporting workflow definition, role, and description parameters.
    Follows the same pattern as MwaaServerlessCreateWorkflowOperator.
---
 .../amazon/docs/operators/mwaa_serverless.rst      | 14 +++++
 .../amazon/aws/operators/mwaa_serverless.py        | 56 +++++++++++++++++
 .../system/amazon/aws/example_mwaa_serverless.py   | 12 ++++
 .../amazon/aws/operators/test_mwaa_serverless.py   | 71 ++++++++++++++++++++++
 4 files changed, 153 insertions(+)

diff --git a/providers/amazon/docs/operators/mwaa_serverless.rst 
b/providers/amazon/docs/operators/mwaa_serverless.rst
index fdfd6ba45e5..b64eb8b0b4f 100644
--- a/providers/amazon/docs/operators/mwaa_serverless.rst
+++ b/providers/amazon/docs/operators/mwaa_serverless.rst
@@ -69,6 +69,20 @@ To wait for an Amazon MWAA Serverless workflow run to 
complete, use
     :start-after: [START howto_sensor_mwaa_serverless_workflow_run]
     :end-before: [END howto_sensor_mwaa_serverless_workflow_run]
 
+.. _howto/operator:MwaaServerlessUpdateWorkflowOperator:
+
+Update a Workflow
+-----------------
+
+To update an existing Amazon MWAA Serverless workflow, use
+:class:`~airflow.providers.amazon.aws.operators.mwaa_serverless.MwaaServerlessUpdateWorkflowOperator`.
+
+.. exampleinclude:: 
/../../amazon/tests/system/amazon/aws/example_mwaa_serverless.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_mwaa_serverless_update_workflow]
+    :end-before: [END howto_operator_mwaa_serverless_update_workflow]
+
 .. _howto/operator:MwaaServerlessStopWorkflowRunOperator:
 
 Stop a Workflow Run
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/mwaa_serverless.py
 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/mwaa_serverless.py
index f76a3354228..25b3b038634 100644
--- 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/mwaa_serverless.py
+++ 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/mwaa_serverless.py
@@ -159,6 +159,62 @@ class 
MwaaServerlessCreateWorkflowOperator(AwsBaseOperator[AwsBaseHook]):
         return workflow_arn
 
 
+class MwaaServerlessUpdateWorkflowOperator(AwsBaseOperator[AwsBaseHook]):
+    """
+    Update an existing Amazon MWAA Serverless workflow.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:MwaaServerlessUpdateWorkflowOperator`
+
+    :param workflow_arn: The ARN of the workflow to update. (templated)
+    :param definition_s3_location: Dict with ``Bucket``, ``ObjectKey``, and 
optionally
+        ``VersionId`` for the updated YAML definition. (templated)
+    :param role_arn: The execution role ARN. (templated)
+    :param description: Optional updated description. (templated)
+    """
+
+    aws_hook_class = AwsBaseHook
+    template_fields: tuple[str, ...] = aws_template_fields(
+        "workflow_arn", "definition_s3_location", "role_arn", "description"
+    )
+    template_fields_renderers = {"definition_s3_location": "json"}
+
+    def __init__(
+        self,
+        *,
+        workflow_arn: str,
+        definition_s3_location: dict[str, str],
+        role_arn: str,
+        description: str | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.workflow_arn = workflow_arn
+        self.definition_s3_location = definition_s3_location
+        self.role_arn = role_arn
+        self.description = description
+
+    @property
+    def _hook_parameters(self) -> dict[str, Any]:
+        return {**super()._hook_parameters, "client_type": "mwaa-serverless"}
+
+    def execute(self, context: Context) -> str:
+        self.log.info("Updating MWAA Serverless workflow %s", 
self.workflow_arn)
+        kwargs: dict[str, Any] = prune_dict(
+            {
+                "WorkflowArn": self.workflow_arn,
+                "DefinitionS3Location": self.definition_s3_location,
+                "RoleArn": self.role_arn,
+                "Description": self.description,
+            }
+        )
+        response = self.hook.conn.update_workflow(**kwargs)
+        workflow_arn = response["WorkflowArn"]
+        self.log.info("Workflow %s updated to version %s", workflow_arn, 
response.get("WorkflowVersion"))
+        return workflow_arn
+
+
 class MwaaServerlessStopWorkflowRunOperator(AwsBaseOperator[AwsBaseHook]):
     """
     Stop a running Amazon MWAA Serverless workflow run.
diff --git 
a/providers/amazon/tests/system/amazon/aws/example_mwaa_serverless.py 
b/providers/amazon/tests/system/amazon/aws/example_mwaa_serverless.py
index 9453fcbf8a2..52736f0b87a 100644
--- a/providers/amazon/tests/system/amazon/aws/example_mwaa_serverless.py
+++ b/providers/amazon/tests/system/amazon/aws/example_mwaa_serverless.py
@@ -22,6 +22,7 @@ from airflow.providers.amazon.aws.operators.mwaa_serverless 
import (
     MwaaServerlessCreateWorkflowOperator,
     MwaaServerlessStartWorkflowRunOperator,
     MwaaServerlessStopWorkflowRunOperator,
+    MwaaServerlessUpdateWorkflowOperator,
 )
 from airflow.providers.amazon.aws.operators.s3 import (
     S3CreateBucketOperator,
@@ -117,6 +118,16 @@ with DAG(
     )
     # [END howto_sensor_mwaa_serverless_workflow_run]
 
+    # [START howto_operator_mwaa_serverless_update_workflow]
+    update_workflow = MwaaServerlessUpdateWorkflowOperator(
+        task_id="update_workflow",
+        workflow_arn=workflow_arn,
+        definition_s3_location={"Bucket": bucket_name, "ObjectKey": 
"workflow.yaml"},
+        role_arn=role_arn,
+        description="Updated system test workflow",
+    )
+    # [END howto_operator_mwaa_serverless_update_workflow]
+
     # Start a second run to test stopping
     start_workflow_2 = MwaaServerlessStartWorkflowRunOperator(
         task_id="start_workflow_2",
@@ -145,6 +156,7 @@ with DAG(
         workflow_arn,
         start_workflow,
         wait_for_run,
+        update_workflow,
         start_workflow_2,
         stop_workflow_run,
         delete_workflow(workflow_arn=workflow_arn),
diff --git 
a/providers/amazon/tests/unit/amazon/aws/operators/test_mwaa_serverless.py 
b/providers/amazon/tests/unit/amazon/aws/operators/test_mwaa_serverless.py
index 2bc25674cb4..33f98abab1b 100644
--- a/providers/amazon/tests/unit/amazon/aws/operators/test_mwaa_serverless.py
+++ b/providers/amazon/tests/unit/amazon/aws/operators/test_mwaa_serverless.py
@@ -26,6 +26,7 @@ from airflow.providers.amazon.aws.operators.mwaa_serverless 
import (
     MwaaServerlessCreateWorkflowOperator,
     MwaaServerlessStartWorkflowRunOperator,
     MwaaServerlessStopWorkflowRunOperator,
+    MwaaServerlessUpdateWorkflowOperator,
 )
 
 from unit.amazon.aws.utils.test_template_fields import validate_template_fields
@@ -149,6 +150,76 @@ class TestMwaaServerlessCreateWorkflowOperator:
         validate_template_fields(self.operator)
 
 
+class TestMwaaServerlessUpdateWorkflowOperator:
+    def setup_method(self):
+        self.operator = MwaaServerlessUpdateWorkflowOperator(
+            task_id="update_workflow",
+            workflow_arn=WORKFLOW_ARN,
+            definition_s3_location=S3_LOCATION,
+            role_arn=ROLE_ARN,
+        )
+
+    @mock.patch.object(AwsBaseHook, "conn", new_callable=mock.PropertyMock)
+    def test_execute(self, mock_conn):
+        mock_client = mock.MagicMock()
+        mock_client.update_workflow.return_value = {
+            "WorkflowArn": WORKFLOW_ARN,
+            "WorkflowVersion": "abc123",
+            "ModifiedAt": "2026-05-12T00:00:00Z",
+        }
+        mock_conn.return_value = mock_client
+
+        result = self.operator.execute({})
+
+        mock_client.update_workflow.assert_called_once_with(
+            WorkflowArn=WORKFLOW_ARN,
+            DefinitionS3Location=S3_LOCATION,
+            RoleArn=ROLE_ARN,
+        )
+        assert result == WORKFLOW_ARN
+
+    @mock.patch.object(AwsBaseHook, "conn", new_callable=mock.PropertyMock)
+    def test_execute_with_description(self, mock_conn):
+        op = MwaaServerlessUpdateWorkflowOperator(
+            task_id="update_workflow",
+            workflow_arn=WORKFLOW_ARN,
+            definition_s3_location=S3_LOCATION,
+            role_arn=ROLE_ARN,
+            description="Updated workflow",
+        )
+        mock_client = mock.MagicMock()
+        mock_client.update_workflow.return_value = {
+            "WorkflowArn": WORKFLOW_ARN,
+            "WorkflowVersion": "def456",
+            "ModifiedAt": "2026-05-12T00:00:00Z",
+        }
+        mock_conn.return_value = mock_client
+
+        result = op.execute({})
+
+        mock_client.update_workflow.assert_called_once_with(
+            WorkflowArn=WORKFLOW_ARN,
+            DefinitionS3Location=S3_LOCATION,
+            RoleArn=ROLE_ARN,
+            Description="Updated workflow",
+        )
+        assert result == WORKFLOW_ARN
+
+    @mock.patch.object(AwsBaseHook, "conn", new_callable=mock.PropertyMock)
+    def test_execute_not_found(self, mock_conn):
+        mock_client = mock.MagicMock()
+        mock_client.update_workflow.side_effect = ClientError(
+            {"Error": {"Code": "ResourceNotFoundException", "Message": "not 
found"}}, "UpdateWorkflow"
+        )
+        mock_conn.return_value = mock_client
+
+        with pytest.raises(ClientError):
+            self.operator.execute({})
+
+    def test_template_fields(self):
+        validate_template_fields(self.operator)
+
+
 class TestMwaaServerlessStopWorkflowRunOperator:
     def setup_method(self):
         self.operator = MwaaServerlessStopWorkflowRunOperator(

Reply via email to