This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new df00cd6dba2 Fix CreateWorkflowOperator if_exists=skip ARN construction
(#66974)
df00cd6dba2 is described below
commit df00cd6dba2713465196b914175855d70e5731df
Author: Karthik Seshadri <[email protected]>
AuthorDate: Fri May 15 10:03:15 2026 -0700
Fix CreateWorkflowOperator if_exists=skip ARN construction (#66974)
---
.../providers/amazon/aws/operators/mwaa_serverless.py | 8 ++++++--
.../system/amazon/aws/example_mwaa_serverless.py | 10 ++++++++++
.../unit/amazon/aws/operators/test_mwaa_serverless.py | 19 ++++++++++++++-----
3 files changed, 30 insertions(+), 7 deletions(-)
diff --git
a/providers/amazon/src/airflow/providers/amazon/aws/operators/mwaa_serverless.py
b/providers/amazon/src/airflow/providers/amazon/aws/operators/mwaa_serverless.py
index cd629505bed..b8a31f7f1a5 100644
---
a/providers/amazon/src/airflow/providers/amazon/aws/operators/mwaa_serverless.py
+++
b/providers/amazon/src/airflow/providers/amazon/aws/operators/mwaa_serverless.py
@@ -151,8 +151,12 @@ class
MwaaServerlessCreateWorkflowOperator(AwsBaseOperator[AwsBaseHook]):
except ClientError as e:
if e.response["Error"]["Code"] == "ConflictException" and
self.if_exists == "skip":
self.log.info("Workflow %s already exists, skipping.",
self.workflow_name)
- response =
self.hook.conn.get_workflow(WorkflowArn=self.workflow_name)
- workflow_arn = response["WorkflowArn"]
+ resource_id = e.response["ResourceId"]
+ workflow_arn = (
+ f"arn:{self.hook.conn_partition}:airflow-serverless"
+ f":{self.hook.conn_region_name}:{self.hook.account_id}"
+ f":workflow/{resource_id}"
+ )
else:
raise
self.log.info("Workflow %s: %s", self.workflow_name, workflow_arn)
diff --git
a/providers/amazon/tests/system/amazon/aws/example_mwaa_serverless.py
b/providers/amazon/tests/system/amazon/aws/example_mwaa_serverless.py
index d1d6f6bdcb1..c36801884cc 100644
--- a/providers/amazon/tests/system/amazon/aws/example_mwaa_serverless.py
+++ b/providers/amazon/tests/system/amazon/aws/example_mwaa_serverless.py
@@ -93,6 +93,15 @@ with DAG(
workflow_arn = create_workflow.output
+ # Test idempotent create (if_exists="skip" path)
+ create_workflow_again = MwaaServerlessCreateWorkflowOperator(
+ task_id="create_workflow_again",
+ workflow_name=bucket_name,
+ definition_s3_location={"Bucket": bucket_name, "ObjectKey":
"workflow.yaml"},
+ role_arn=role_arn,
+ if_exists="skip",
+ )
+
# [START howto_operator_mwaa_serverless_start_workflow_run]
start_workflow = MwaaServerlessStartWorkflowRunOperator(
task_id="start_workflow",
@@ -156,6 +165,7 @@ with DAG(
upload_workflow_yaml,
workflow_arn,
# TEST BODY
+ create_workflow_again,
start_workflow,
wait_for_run,
update_workflow,
diff --git
a/providers/amazon/tests/unit/amazon/aws/operators/test_mwaa_serverless.py
b/providers/amazon/tests/unit/amazon/aws/operators/test_mwaa_serverless.py
index 69397ebd15c..9b2be23881b 100644
--- a/providers/amazon/tests/unit/amazon/aws/operators/test_mwaa_serverless.py
+++ b/providers/amazon/tests/unit/amazon/aws/operators/test_mwaa_serverless.py
@@ -115,18 +115,27 @@ class TestMwaaServerlessCreateWorkflowOperator:
)
assert result == WORKFLOW_ARN
+ @mock.patch.object(AwsBaseHook, "account_id",
new_callable=mock.PropertyMock)
+ @mock.patch.object(AwsBaseHook, "conn_region_name",
new_callable=mock.PropertyMock)
+ @mock.patch.object(AwsBaseHook, "conn_partition",
new_callable=mock.PropertyMock)
@mock.patch.object(AwsBaseHook, "conn", new_callable=mock.PropertyMock)
- def test_execute_skip_existing(self, mock_conn):
+ def test_execute_skip_existing(self, mock_conn, mock_partition,
mock_region, mock_account):
mock_client = mock.MagicMock()
mock_client.create_workflow.side_effect = ClientError(
- {"Error": {"Code": "ConflictException", "Message": "Already
exists"}},
+ {
+ "Error": {"Code": "ConflictException", "Message": "Already
exists"},
+ "ResourceId": "test-workflow-aBcDeFgHiJ",
+ "ResourceType": "Workflow",
+ },
"CreateWorkflow",
)
- mock_client.get_workflow.return_value = {"WorkflowArn": WORKFLOW_ARN}
mock_conn.return_value = mock_client
+ mock_partition.return_value = "aws"
+ mock_region.return_value = "us-east-1"
+ mock_account.return_value = "123456789012"
result = self.operator.execute({})
- assert result == WORKFLOW_ARN
+ assert result ==
"arn:aws:airflow-serverless:us-east-1:123456789012:workflow/test-workflow-aBcDeFgHiJ"
@mock.patch.object(AwsBaseHook, "conn", new_callable=mock.PropertyMock)
def test_execute_fail_on_conflict(self, mock_conn):
@@ -144,7 +153,7 @@ class TestMwaaServerlessCreateWorkflowOperator:
)
mock_conn.return_value = mock_client
- with pytest.raises(ClientError):
+ with pytest.raises(ClientError, match="ConflictException"):
op.execute({})
def test_template_fields(self):