This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1491ee166a1 Restrict EC2 cleanup to waiter errors and guard behind 
flag. (#61272)
1491ee166a1 is described below

commit 1491ee166a140d5e6e229930cfd094faf67c9e4e
Author: SameerMesiah97 <[email protected]>
AuthorDate: Thu Feb 5 15:20:44 2026 +0000

    Restrict EC2 cleanup to waiter errors and guard behind flag. (#61272)
    
    Co-authored-by: Sameer Mesiah <[email protected]>
---
 .../airflow/providers/amazon/aws/operators/ec2.py  | 27 ++++++++++++++++------
 .../tests/unit/amazon/aws/operators/test_ec2.py    |  2 ++
 2 files changed, 22 insertions(+), 7 deletions(-)

diff --git a/providers/amazon/src/airflow/providers/amazon/aws/operators/ec2.py 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/ec2.py
index acacaaf779f..f5364940248 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/ec2.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/ec2.py
@@ -20,6 +20,8 @@ from __future__ import annotations
 from collections.abc import Sequence
 from typing import TYPE_CHECKING, Any
 
+from botocore.exceptions import WaiterError
+
 from airflow.providers.amazon.aws.hooks.ec2 import EC2Hook
 from airflow.providers.amazon.aws.links.ec2 import (
     EC2InstanceDashboardLink,
@@ -172,6 +174,8 @@ class EC2CreateInstanceOperator(AwsBaseOperator[EC2Hook]):
     :param config: Dictionary for arbitrary parameters to the boto3 
run_instances call.
     :param wait_for_completion: If True, the operator will wait for the 
instance to be
         in the `running` state before returning.
+    :param terminate_instance_on_failure: If True, attempt to terminate the 
EC2 instance if the
+        Airflow task fails after the instance has been created. Defaults to 
True.
     """
 
     aws_hook_class = EC2Hook
@@ -196,6 +200,7 @@ class EC2CreateInstanceOperator(AwsBaseOperator[EC2Hook]):
         max_attempts: int = 20,
         config: dict | None = None,
         wait_for_completion: bool = False,
+        terminate_instance_on_failure: bool = True,
         **kwargs,
     ):
         super().__init__(**kwargs)
@@ -206,6 +211,7 @@ class EC2CreateInstanceOperator(AwsBaseOperator[EC2Hook]):
         self.max_attempts = max_attempts
         self.config = config or {}
         self.wait_for_completion = wait_for_completion
+        self.terminate_instance_on_failure = terminate_instance_on_failure
 
     @property
     def _hook_parameters(self) -> dict[str, Any]:
@@ -245,18 +251,25 @@ class EC2CreateInstanceOperator(AwsBaseOperator[EC2Hook]):
             return instance_ids
 
         # Best-effort cleanup when post-creation steps fail (e.g. 
IAM/permission errors).
-        except Exception:
+        except WaiterError:
             self.log.exception(
-                "Exception after EC2 instance creation; attempting cleanup for 
instances %s",
+                "Exception after creation of EC2 instances: %s.",
                 instance_ids,
             )
-            try:
-                self.hook.terminate_instances(instance_ids=instance_ids)
-            except Exception:
-                self.log.exception(
-                    "Failed to cleanup EC2 instances %s after task failure",
+            # terminate_instance_on_failure defaults to True to prevent 
orphaned EC2 instances.
+            if self.terminate_instance_on_failure:
+                self.log.info(
+                    "Attempting termination of instances: %s.",
                     instance_ids,
                 )
+
+                try:
+                    self.hook.terminate_instances(instance_ids=instance_ids)
+                except Exception:
+                    self.log.exception(
+                        "Failed to terminate EC2 instances: %s after task 
failure.",
+                        instance_ids,
+                    )
             raise
 
     def on_kill(self) -> None:
diff --git a/providers/amazon/tests/unit/amazon/aws/operators/test_ec2.py 
b/providers/amazon/tests/unit/amazon/aws/operators/test_ec2.py
index 89eae7cdd0f..8faa74c565a 100644
--- a/providers/amazon/tests/unit/amazon/aws/operators/test_ec2.py
+++ b/providers/amazon/tests/unit/amazon/aws/operators/test_ec2.py
@@ -107,6 +107,7 @@ class TestEC2CreateInstanceOperator(BaseEc2TestClass):
             task_id="test_cleanup_on_error",
             image_id=self._get_image_id(ec2_hook),
             wait_for_completion=True,
+            terminate_instance_on_failure=True,
         )
 
         waiter_error = WaiterError(
@@ -140,6 +141,7 @@ class TestEC2CreateInstanceOperator(BaseEc2TestClass):
             task_id="test_cleanup_failure_does_not_mask_error",
             image_id=self._get_image_id(ec2_hook),
             wait_for_completion=True,
+            terminate_instance_on_failure=True,
         )
 
         waiter_error = WaiterError(

Reply via email to