This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new df00cd6dba2 Fix CreateWorkflowOperator if_exists=skip ARN construction 
(#66974)
df00cd6dba2 is described below

commit df00cd6dba2713465196b914175855d70e5731df
Author: Karthik Seshadri <[email protected]>
AuthorDate: Fri May 15 10:03:15 2026 -0700

    Fix CreateWorkflowOperator if_exists=skip ARN construction (#66974)
---
 .../providers/amazon/aws/operators/mwaa_serverless.py |  8 ++++++--
 .../system/amazon/aws/example_mwaa_serverless.py      | 10 ++++++++++
 .../unit/amazon/aws/operators/test_mwaa_serverless.py | 19 ++++++++++++++-----
 3 files changed, 30 insertions(+), 7 deletions(-)

diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/mwaa_serverless.py
 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/mwaa_serverless.py
index cd629505bed..b8a31f7f1a5 100644
--- 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/mwaa_serverless.py
+++ 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/mwaa_serverless.py
@@ -151,8 +151,12 @@ class 
MwaaServerlessCreateWorkflowOperator(AwsBaseOperator[AwsBaseHook]):
         except ClientError as e:
             if e.response["Error"]["Code"] == "ConflictException" and 
self.if_exists == "skip":
                 self.log.info("Workflow %s already exists, skipping.", 
self.workflow_name)
-                response = 
self.hook.conn.get_workflow(WorkflowArn=self.workflow_name)
-                workflow_arn = response["WorkflowArn"]
+                resource_id = e.response["ResourceId"]
+                workflow_arn = (
+                    f"arn:{self.hook.conn_partition}:airflow-serverless"
+                    f":{self.hook.conn_region_name}:{self.hook.account_id}"
+                    f":workflow/{resource_id}"
+                )
             else:
                 raise
         self.log.info("Workflow %s: %s", self.workflow_name, workflow_arn)
diff --git 
a/providers/amazon/tests/system/amazon/aws/example_mwaa_serverless.py 
b/providers/amazon/tests/system/amazon/aws/example_mwaa_serverless.py
index d1d6f6bdcb1..c36801884cc 100644
--- a/providers/amazon/tests/system/amazon/aws/example_mwaa_serverless.py
+++ b/providers/amazon/tests/system/amazon/aws/example_mwaa_serverless.py
@@ -93,6 +93,15 @@ with DAG(
 
     workflow_arn = create_workflow.output
 
+    # Test idempotent create (if_exists="skip" path)
+    create_workflow_again = MwaaServerlessCreateWorkflowOperator(
+        task_id="create_workflow_again",
+        workflow_name=bucket_name,
+        definition_s3_location={"Bucket": bucket_name, "ObjectKey": 
"workflow.yaml"},
+        role_arn=role_arn,
+        if_exists="skip",
+    )
+
     # [START howto_operator_mwaa_serverless_start_workflow_run]
     start_workflow = MwaaServerlessStartWorkflowRunOperator(
         task_id="start_workflow",
@@ -156,6 +165,7 @@ with DAG(
         upload_workflow_yaml,
         workflow_arn,
         # TEST BODY
+        create_workflow_again,
         start_workflow,
         wait_for_run,
         update_workflow,
diff --git 
a/providers/amazon/tests/unit/amazon/aws/operators/test_mwaa_serverless.py 
b/providers/amazon/tests/unit/amazon/aws/operators/test_mwaa_serverless.py
index 69397ebd15c..9b2be23881b 100644
--- a/providers/amazon/tests/unit/amazon/aws/operators/test_mwaa_serverless.py
+++ b/providers/amazon/tests/unit/amazon/aws/operators/test_mwaa_serverless.py
@@ -115,18 +115,27 @@ class TestMwaaServerlessCreateWorkflowOperator:
         )
         assert result == WORKFLOW_ARN
 
+    @mock.patch.object(AwsBaseHook, "account_id", 
new_callable=mock.PropertyMock)
+    @mock.patch.object(AwsBaseHook, "conn_region_name", 
new_callable=mock.PropertyMock)
+    @mock.patch.object(AwsBaseHook, "conn_partition", 
new_callable=mock.PropertyMock)
     @mock.patch.object(AwsBaseHook, "conn", new_callable=mock.PropertyMock)
-    def test_execute_skip_existing(self, mock_conn):
+    def test_execute_skip_existing(self, mock_conn, mock_partition, 
mock_region, mock_account):
         mock_client = mock.MagicMock()
         mock_client.create_workflow.side_effect = ClientError(
-            {"Error": {"Code": "ConflictException", "Message": "Already 
exists"}},
+            {
+                "Error": {"Code": "ConflictException", "Message": "Already 
exists"},
+                "ResourceId": "test-workflow-aBcDeFgHiJ",
+                "ResourceType": "Workflow",
+            },
             "CreateWorkflow",
         )
-        mock_client.get_workflow.return_value = {"WorkflowArn": WORKFLOW_ARN}
         mock_conn.return_value = mock_client
+        mock_partition.return_value = "aws"
+        mock_region.return_value = "us-east-1"
+        mock_account.return_value = "123456789012"
 
         result = self.operator.execute({})
-        assert result == WORKFLOW_ARN
+        assert result == 
"arn:aws:airflow-serverless:us-east-1:123456789012:workflow/test-workflow-aBcDeFgHiJ"
 
     @mock.patch.object(AwsBaseHook, "conn", new_callable=mock.PropertyMock)
     def test_execute_fail_on_conflict(self, mock_conn):
@@ -144,7 +153,7 @@ class TestMwaaServerlessCreateWorkflowOperator:
         )
         mock_conn.return_value = mock_client
 
-        with pytest.raises(ClientError):
+        with pytest.raises(ClientError, match="ConflictException"):
             op.execute({})
 
     def test_template_fields(self):

Reply via email to