This is an automated email from the ASF dual-hosted git repository.
o-nikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 09ea5bf9b10 Add `MwaaServerlessUpdateWorkflowOperator` (#66833)
09ea5bf9b10 is described below
commit 09ea5bf9b1017a20c1cc4d26e5c0a6bb09ca0098
Author: Karthik Seshadri <[email protected]>
AuthorDate: Tue May 12 23:28:15 2026 -0700
Add `MwaaServerlessUpdateWorkflowOperator` (#66833)
Add operator to update existing Amazon MWAA Serverless workflows,
supporting workflow definition, role, and description parameters.
Follows the same pattern as MwaaServerlessCreateWorkflowOperator.
---
.../amazon/docs/operators/mwaa_serverless.rst | 14 +++++
.../amazon/aws/operators/mwaa_serverless.py | 56 +++++++++++++++++
.../system/amazon/aws/example_mwaa_serverless.py | 12 ++++
.../amazon/aws/operators/test_mwaa_serverless.py | 71 ++++++++++++++++++++++
4 files changed, 153 insertions(+)
diff --git a/providers/amazon/docs/operators/mwaa_serverless.rst
b/providers/amazon/docs/operators/mwaa_serverless.rst
index fdfd6ba45e5..b64eb8b0b4f 100644
--- a/providers/amazon/docs/operators/mwaa_serverless.rst
+++ b/providers/amazon/docs/operators/mwaa_serverless.rst
@@ -69,6 +69,20 @@ To wait for an Amazon MWAA Serverless workflow run to
complete, use
:start-after: [START howto_sensor_mwaa_serverless_workflow_run]
:end-before: [END howto_sensor_mwaa_serverless_workflow_run]
+.. _howto/operator:MwaaServerlessUpdateWorkflowOperator:
+
+Update a Workflow
+-----------------
+
+To update an existing Amazon MWAA Serverless workflow, use
+:class:`~airflow.providers.amazon.aws.operators.mwaa_serverless.MwaaServerlessUpdateWorkflowOperator`.
+
+.. exampleinclude::
/../../amazon/tests/system/amazon/aws/example_mwaa_serverless.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_mwaa_serverless_update_workflow]
+ :end-before: [END howto_operator_mwaa_serverless_update_workflow]
+
.. _howto/operator:MwaaServerlessStopWorkflowRunOperator:
Stop a Workflow Run
diff --git
a/providers/amazon/src/airflow/providers/amazon/aws/operators/mwaa_serverless.py
b/providers/amazon/src/airflow/providers/amazon/aws/operators/mwaa_serverless.py
index f76a3354228..25b3b038634 100644
---
a/providers/amazon/src/airflow/providers/amazon/aws/operators/mwaa_serverless.py
+++
b/providers/amazon/src/airflow/providers/amazon/aws/operators/mwaa_serverless.py
@@ -159,6 +159,62 @@ class
MwaaServerlessCreateWorkflowOperator(AwsBaseOperator[AwsBaseHook]):
return workflow_arn
+class MwaaServerlessUpdateWorkflowOperator(AwsBaseOperator[AwsBaseHook]):
+ """
+ Update an existing Amazon MWAA Serverless workflow.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:MwaaServerlessUpdateWorkflowOperator`
+
+ :param workflow_arn: The ARN of the workflow to update. (templated)
+ :param definition_s3_location: Dict with ``Bucket``, ``ObjectKey``, and
optionally
+ ``VersionId`` for the updated YAML definition. (templated)
+ :param role_arn: The execution role ARN. (templated)
+ :param description: Optional updated description. (templated)
+ """
+
+ aws_hook_class = AwsBaseHook
+ template_fields: tuple[str, ...] = aws_template_fields(
+ "workflow_arn", "definition_s3_location", "role_arn", "description"
+ )
+ template_fields_renderers = {"definition_s3_location": "json"}
+
+ def __init__(
+ self,
+ *,
+ workflow_arn: str,
+ definition_s3_location: dict[str, str],
+ role_arn: str,
+ description: str | None = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.workflow_arn = workflow_arn
+ self.definition_s3_location = definition_s3_location
+ self.role_arn = role_arn
+ self.description = description
+
+ @property
+ def _hook_parameters(self) -> dict[str, Any]:
+ return {**super()._hook_parameters, "client_type": "mwaa-serverless"}
+
+ def execute(self, context: Context) -> str:
+ self.log.info("Updating MWAA Serverless workflow %s",
self.workflow_arn)
+ kwargs: dict[str, Any] = prune_dict(
+ {
+ "WorkflowArn": self.workflow_arn,
+ "DefinitionS3Location": self.definition_s3_location,
+ "RoleArn": self.role_arn,
+ "Description": self.description,
+ }
+ )
+ response = self.hook.conn.update_workflow(**kwargs)
+ workflow_arn = response["WorkflowArn"]
+ self.log.info("Workflow %s updated to version %s", workflow_arn,
response.get("WorkflowVersion"))
+ return workflow_arn
+
+
class MwaaServerlessStopWorkflowRunOperator(AwsBaseOperator[AwsBaseHook]):
"""
Stop a running Amazon MWAA Serverless workflow run.
diff --git
a/providers/amazon/tests/system/amazon/aws/example_mwaa_serverless.py
b/providers/amazon/tests/system/amazon/aws/example_mwaa_serverless.py
index 9453fcbf8a2..52736f0b87a 100644
--- a/providers/amazon/tests/system/amazon/aws/example_mwaa_serverless.py
+++ b/providers/amazon/tests/system/amazon/aws/example_mwaa_serverless.py
@@ -22,6 +22,7 @@ from airflow.providers.amazon.aws.operators.mwaa_serverless
import (
MwaaServerlessCreateWorkflowOperator,
MwaaServerlessStartWorkflowRunOperator,
MwaaServerlessStopWorkflowRunOperator,
+ MwaaServerlessUpdateWorkflowOperator,
)
from airflow.providers.amazon.aws.operators.s3 import (
S3CreateBucketOperator,
@@ -117,6 +118,16 @@ with DAG(
)
# [END howto_sensor_mwaa_serverless_workflow_run]
+ # [START howto_operator_mwaa_serverless_update_workflow]
+ update_workflow = MwaaServerlessUpdateWorkflowOperator(
+ task_id="update_workflow",
+ workflow_arn=workflow_arn,
+ definition_s3_location={"Bucket": bucket_name, "ObjectKey":
"workflow.yaml"},
+ role_arn=role_arn,
+ description="Updated system test workflow",
+ )
+ # [END howto_operator_mwaa_serverless_update_workflow]
+
# Start a second run to test stopping
start_workflow_2 = MwaaServerlessStartWorkflowRunOperator(
task_id="start_workflow_2",
@@ -145,6 +156,7 @@ with DAG(
workflow_arn,
start_workflow,
wait_for_run,
+ update_workflow,
start_workflow_2,
stop_workflow_run,
delete_workflow(workflow_arn=workflow_arn),
diff --git
a/providers/amazon/tests/unit/amazon/aws/operators/test_mwaa_serverless.py
b/providers/amazon/tests/unit/amazon/aws/operators/test_mwaa_serverless.py
index 2bc25674cb4..33f98abab1b 100644
--- a/providers/amazon/tests/unit/amazon/aws/operators/test_mwaa_serverless.py
+++ b/providers/amazon/tests/unit/amazon/aws/operators/test_mwaa_serverless.py
@@ -26,6 +26,7 @@ from airflow.providers.amazon.aws.operators.mwaa_serverless
import (
MwaaServerlessCreateWorkflowOperator,
MwaaServerlessStartWorkflowRunOperator,
MwaaServerlessStopWorkflowRunOperator,
+ MwaaServerlessUpdateWorkflowOperator,
)
from unit.amazon.aws.utils.test_template_fields import validate_template_fields
@@ -149,6 +150,76 @@ class TestMwaaServerlessCreateWorkflowOperator:
validate_template_fields(self.operator)
+class TestMwaaServerlessUpdateWorkflowOperator:
+ def setup_method(self):
+ self.operator = MwaaServerlessUpdateWorkflowOperator(
+ task_id="update_workflow",
+ workflow_arn=WORKFLOW_ARN,
+ definition_s3_location=S3_LOCATION,
+ role_arn=ROLE_ARN,
+ )
+
+ @mock.patch.object(AwsBaseHook, "conn", new_callable=mock.PropertyMock)
+ def test_execute(self, mock_conn):
+ mock_client = mock.MagicMock()
+ mock_client.update_workflow.return_value = {
+ "WorkflowArn": WORKFLOW_ARN,
+ "WorkflowVersion": "abc123",
+ "ModifiedAt": "2026-05-12T00:00:00Z",
+ }
+ mock_conn.return_value = mock_client
+
+ result = self.operator.execute({})
+
+ mock_client.update_workflow.assert_called_once_with(
+ WorkflowArn=WORKFLOW_ARN,
+ DefinitionS3Location=S3_LOCATION,
+ RoleArn=ROLE_ARN,
+ )
+ assert result == WORKFLOW_ARN
+
+ @mock.patch.object(AwsBaseHook, "conn", new_callable=mock.PropertyMock)
+ def test_execute_with_description(self, mock_conn):
+ op = MwaaServerlessUpdateWorkflowOperator(
+ task_id="update_workflow",
+ workflow_arn=WORKFLOW_ARN,
+ definition_s3_location=S3_LOCATION,
+ role_arn=ROLE_ARN,
+ description="Updated workflow",
+ )
+ mock_client = mock.MagicMock()
+ mock_client.update_workflow.return_value = {
+ "WorkflowArn": WORKFLOW_ARN,
+ "WorkflowVersion": "def456",
+ "ModifiedAt": "2026-05-12T00:00:00Z",
+ }
+ mock_conn.return_value = mock_client
+
+ result = op.execute({})
+
+ mock_client.update_workflow.assert_called_once_with(
+ WorkflowArn=WORKFLOW_ARN,
+ DefinitionS3Location=S3_LOCATION,
+ RoleArn=ROLE_ARN,
+ Description="Updated workflow",
+ )
+ assert result == WORKFLOW_ARN
+
+ @mock.patch.object(AwsBaseHook, "conn", new_callable=mock.PropertyMock)
+ def test_execute_not_found(self, mock_conn):
+ mock_client = mock.MagicMock()
+ mock_client.update_workflow.side_effect = ClientError(
+ {"Error": {"Code": "ResourceNotFoundException", "Message": "not
found"}}, "UpdateWorkflow"
+ )
+ mock_conn.return_value = mock_client
+
+ with pytest.raises(ClientError):
+ self.operator.execute({})
+
+ def test_template_fields(self):
+ validate_template_fields(self.operator)
+
+
class TestMwaaServerlessStopWorkflowRunOperator:
def setup_method(self):
self.operator = MwaaServerlessStopWorkflowRunOperator(