This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a998e28e68e Add skip_on_exit_code support to EcsRunTaskOperator
(#63274)
a998e28e68e is described below
commit a998e28e68ec822af45d622094b6c60395644b77
Author: Yunho Jung <[email protected]>
AuthorDate: Wed Mar 11 02:09:18 2026 +0900
Add skip_on_exit_code support to EcsRunTaskOperator (#63274)
Allow users to specify exit codes that should raise an
AirflowSkipException (marking the task as skipped) via the new
`skip_on_exit_code` parameter. This is consistent with the existing
behavior in DockerOperator and KubernetesPodOperator.
---
.../airflow/providers/amazon/aws/operators/ecs.py | 25 ++++++++++++---
.../tests/unit/amazon/aws/operators/test_ecs.py | 37 +++++++++++++++++++++-
2 files changed, 57 insertions(+), 5 deletions(-)
diff --git a/providers/amazon/src/airflow/providers/amazon/aws/operators/ecs.py
b/providers/amazon/src/airflow/providers/amazon/aws/operators/ecs.py
index 5e4a4b04f90..05f0102152d 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/ecs.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/ecs.py
@@ -18,7 +18,7 @@
from __future__ import annotations
import re
-from collections.abc import Sequence
+from collections.abc import Container, Sequence
from datetime import timedelta
from functools import cached_property
from time import sleep
@@ -39,7 +39,7 @@ from airflow.providers.amazon.aws.utils import
validate_execute_complete_event
from airflow.providers.amazon.aws.utils.identifiers import generate_uuid
from airflow.providers.amazon.aws.utils.mixins import aws_template_fields
from airflow.providers.amazon.aws.utils.task_log_fetcher import
AwsTaskLogFetcher
-from airflow.providers.common.compat.sdk import AirflowException, conf
+from airflow.providers.common.compat.sdk import AirflowException,
AirflowSkipException, conf
from airflow.utils.helpers import prune_dict
if TYPE_CHECKING:
@@ -394,6 +394,9 @@ class EcsRunTaskOperator(EcsBaseOperator):
:param deferrable: If True, the operator will wait asynchronously for the
job to complete.
This implies waiting for completion. This mode requires aiobotocore
module to be installed.
(default: False)
+ :param skip_on_exit_code: If task exits with this exit code, leave the task
+ in ``skipped`` state (default: None). If set to ``None``, any non-zero
+ exit code will be treated as a failure. Can be an int or a container
of ints.
:param do_xcom_push: If True, the operator will push the ECS task ARN to
XCom with key 'ecs_task_arn'.
Additionally, if logs are fetched, the last log message will be pushed
to XCom with the key 'return_value'. (default: False)
:param stop_task_on_failure: If True, attempt to stop the ECS task if the
Airflow task fails
@@ -461,6 +464,7 @@ class EcsRunTaskOperator(EcsBaseOperator):
# Set the default waiter duration to 70 days (attempts*delay)
# Airflow execution_timeout handles task timeout
deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
+ skip_on_exit_code: int | Container[int] | None = None,
stop_task_on_failure: bool = True,
**kwargs,
):
@@ -500,6 +504,13 @@ class EcsRunTaskOperator(EcsBaseOperator):
self.waiter_delay = waiter_delay
self.waiter_max_attempts = waiter_max_attempts
self.deferrable = deferrable
+ self.skip_on_exit_code = (
+ skip_on_exit_code
+ if isinstance(skip_on_exit_code, Container)
+ else [skip_on_exit_code]
+ if skip_on_exit_code is not None
+ else []
+ )
self.stop_task_on_failure = stop_task_on_failure
if self._aws_logs_enabled() and not self.wait_for_completion:
@@ -763,15 +774,21 @@ class EcsRunTaskOperator(EcsBaseOperator):
containers = task["containers"]
for container in containers:
if container.get("lastStatus") == "STOPPED" and
container.get("exitCode", 1) != 0:
+ exit_code = container.get("exitCode", 1)
+ if exit_code in self.skip_on_exit_code:
+ exception_cls: type[AirflowException] =
AirflowSkipException
+ else:
+ exception_cls = AirflowException
+
if self.task_log_fetcher:
last_logs = "\n".join(
self.task_log_fetcher.get_last_log_messages(self.number_logs_exception)
)
- raise AirflowException(
+ raise exception_cls(
f"This task is not in success state - last
{self.number_logs_exception} "
f"logs from Cloudwatch:\n{last_logs}"
)
- raise AirflowException(f"This task is not in success state
{task}")
+ raise exception_cls(f"This task is not in success state
{task}")
if container.get("lastStatus") == "PENDING":
raise AirflowException(f"This task is still pending
{task}")
if "error" in container.get("reason", "").lower():
diff --git a/providers/amazon/tests/unit/amazon/aws/operators/test_ecs.py
b/providers/amazon/tests/unit/amazon/aws/operators/test_ecs.py
index 5a866e36e11..946b086e0fe 100644
--- a/providers/amazon/tests/unit/amazon/aws/operators/test_ecs.py
+++ b/providers/amazon/tests/unit/amazon/aws/operators/test_ecs.py
@@ -38,7 +38,7 @@ from airflow.providers.amazon.aws.operators.ecs import (
from airflow.providers.amazon.aws.triggers.ecs import TaskDoneTrigger
from airflow.providers.amazon.aws.utils.task_log_fetcher import
AwsTaskLogFetcher
from airflow.providers.amazon.version_compat import NOTSET
-from airflow.providers.common.compat.sdk import AirflowException, TaskDeferred
+from airflow.providers.common.compat.sdk import AirflowException,
AirflowSkipException, TaskDeferred
from unit.amazon.aws.utils.test_template_fields import validate_template_fields
@@ -603,6 +603,41 @@ class TestEcsRunTaskOperator(EcsBaseTestCase):
self.ecs._check_success_task()
client_mock.describe_tasks.assert_called_once_with(cluster="c",
tasks=["arn"])
+ @mock.patch.object(EcsBaseOperator, "client")
+ def test_check_success_task_raises_skip_exception(self, client_mock):
+ self.ecs.arn = "arn"
+ self.ecs.skip_on_exit_code = [2]
+ client_mock.describe_tasks.return_value = {
+ "tasks": [{"containers": [{"name": "container-name", "lastStatus":
"STOPPED", "exitCode": 2}]}]
+ }
+ with pytest.raises(AirflowSkipException):
+ self.ecs._check_success_task()
+
+ @mock.patch.object(EcsBaseOperator, "client")
+
@mock.patch("airflow.providers.amazon.aws.utils.task_log_fetcher.AwsTaskLogFetcher")
+ def test_check_success_task_skip_exception_with_logs(self,
log_fetcher_mock, client_mock):
+ self.ecs.arn = "arn"
+ self.ecs.skip_on_exit_code = [2]
+ self.ecs.task_log_fetcher = log_fetcher_mock
+ log_fetcher_mock.get_last_log_messages.return_value = ["log1", "log2"]
+ client_mock.describe_tasks.return_value = {
+ "tasks": [{"containers": [{"name": "container-name", "lastStatus":
"STOPPED", "exitCode": 2}]}]
+ }
+ with pytest.raises(AirflowSkipException, match="This task is not in
success state"):
+ self.ecs._check_success_task()
+
+ @mock.patch.object(EcsBaseOperator, "client")
+ def
test_check_success_task_unmatched_exit_code_raises_airflow_exception(self,
client_mock):
+ """Exit codes not in skip_on_exit_code raise AirflowException."""
+ self.ecs.arn = "arn"
+ self.ecs.skip_on_exit_code = [2]
+ client_mock.describe_tasks.return_value = {
+ "tasks": [{"containers": [{"name": "container-name", "lastStatus":
"STOPPED", "exitCode": 1}]}]
+ }
+ with pytest.raises(AirflowException) as ctx:
+ self.ecs._check_success_task()
+ assert type(ctx.value) is AirflowException
+
@pytest.mark.parametrize(
("launch_type", "tags"),
[