This is an automated email from the ASF dual-hosted git repository.
ferruzzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 230da3e399c EcsRunTaskOperator fails when no containers are provided
in the response (#51692)
230da3e399c is described below
commit 230da3e399c26699daad9fd95a413a844f7cb13f
Author: Dominik <[email protected]>
AuthorDate: Sat Sep 6 05:52:39 2025 +0200
EcsRunTaskOperator fails when no containers are provided in the response
(#51692)
* Added logic to safely access the container name only if its required and
poll multiple times in case the task is not yet active
* Adujusted the test case to better reflect actual ecs response
---
.../airflow/providers/amazon/aws/operators/ecs.py | 25 +++++++--
.../tests/unit/amazon/aws/operators/test_ecs.py | 62 ++++++++++++++++++++++
2 files changed, 84 insertions(+), 3 deletions(-)
diff --git a/providers/amazon/src/airflow/providers/amazon/aws/operators/ecs.py
b/providers/amazon/src/airflow/providers/amazon/aws/operators/ecs.py
index 30baf423986..0152c465455 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/ecs.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/ecs.py
@@ -21,6 +21,7 @@ import re
from collections.abc import Sequence
from datetime import timedelta
from functools import cached_property
+from time import sleep
from typing import TYPE_CHECKING, Any
from airflow.configuration import conf
@@ -629,10 +630,22 @@ class EcsRunTaskOperator(EcsBaseOperator):
self.log.info("ECS Task started: %s", response)
self.arn = response["tasks"][0]["taskArn"]
- if not self.container_name:
- self.container_name = response["tasks"][0]["containers"][0]["name"]
self.log.info("ECS task ID is: %s", self._get_ecs_task_id(self.arn))
+ if not self.container_name and (self.awslogs_group and
self.awslogs_stream_prefix):
+ backoff_schedule = [10, 30]
+ for delay in backoff_schedule:
+ sleep(delay)
+ response = self.client.describe_tasks(cluster=self.cluster,
tasks=[self.arn])
+ containers = response["tasks"][0].get("containers", [])
+ if containers:
+ self.container_name = containers[0]["name"]
+ if self.container_name:
+ break
+
+ if not self.container_name:
+ self.log.info("Could not find container name, required for the
log stream after 2 tries")
+
def _try_reattach_task(self, started_by: str):
if not started_by:
raise AirflowException("`started_by` should not be empty or None")
@@ -666,7 +679,13 @@ class EcsRunTaskOperator(EcsBaseOperator):
return self.awslogs_group and self.awslogs_stream_prefix
def _get_logs_stream_name(self) -> str:
- if (
+ if not self.container_name and self.awslogs_stream_prefix and "/" not
in self.awslogs_stream_prefix:
+ self.log.warning(
+ "Container name could not be inferred and
awslogs_stream_prefix '%s' does not contain '/'. "
+ "This may cause issues when extracting logs from Cloudwatch.",
+ self.awslogs_stream_prefix,
+ )
+ elif (
self.awslogs_stream_prefix
and self.container_name
and not
self.awslogs_stream_prefix.endswith(f"/{self.container_name}")
diff --git a/providers/amazon/tests/unit/amazon/aws/operators/test_ecs.py
b/providers/amazon/tests/unit/amazon/aws/operators/test_ecs.py
index efc6d33630e..23d3fdc9282 100644
--- a/providers/amazon/tests/unit/amazon/aws/operators/test_ecs.py
+++ b/providers/amazon/tests/unit/amazon/aws/operators/test_ecs.py
@@ -81,6 +81,20 @@ RESPONSE_WITHOUT_FAILURES = {
}
],
}
+RESPONSE_WITHOUT_NAME = {
+ "failures": [],
+ "tasks": [
+ {
+ "containers": [],
+ "desiredStatus": "RUNNING",
+ "lastStatus": "PENDING",
+ "taskArn": f"arn:aws:ecs:us-east-1:012345678910:task/{TASK_ID}",
+ "taskDefinitionArn":
"arn:aws:ecs:us-east-1:012345678910:task-definition/hello_world:11",
+ }
+ ],
+}
+
+
WAITERS_TEST_CASES = [
pytest.param(None, None, id="default-values"),
pytest.param(3.14, None, id="set-delay-only"),
@@ -788,6 +802,54 @@ class TestEcsRunTaskOperator(EcsBaseTestCase):
assert
self.ecs._get_logs_stream_name().startswith(f"{prefix}/{container_name}/")
+ @mock.patch.object(EcsBaseOperator, "client")
+ @mock.patch("airflow.providers.amazon.aws.operators.ecs.sleep",
return_value=None)
+ def test_container_name_not_set(self, sleep_mock, client_mock):
+ self.set_up_operator(
+ awslogs_group="awslogs-group",
+ awslogs_stream_prefix="prefix",
+ container_name=None,
+ )
+ client_mock.run_task.return_value = RESPONSE_WITHOUT_NAME
+ client_mock.describe_tasks.side_effect = [
+ {"tasks": [{"containers": []}]},
+ {"tasks": [{"containers": [{"name": "resolved-container"}]}]},
+ ]
+ self.ecs._start_task()
+ assert client_mock.describe_tasks.call_count == 2
+ assert self.ecs.container_name == "resolved-container"
+
+ @mock.patch.object(EcsBaseOperator, "client")
+ @mock.patch.object(EcsBaseOperator, "log")
+ @mock.patch("airflow.providers.amazon.aws.operators.ecs.sleep",
return_value=None)
+ def test_container_name_resolution_fails_logs_message(self, sleep_mock,
log_mock, client_mock):
+ self.set_up_operator(
+ awslogs_group="test-group",
+ awslogs_stream_prefix="prefix",
+ container_name=None,
+ )
+ client_mock.run_task.return_value = RESPONSE_WITHOUT_NAME
+ client_mock.describe_tasks.return_value = {"tasks": [{"containers":
[{"name": None}]}]}
+
+ self.ecs._start_task()
+
+ assert client_mock.describe_tasks.call_count == 2
+ assert self.ecs.container_name is None
+ log_mock.info.assert_called_with(
+ "Could not find container name, required for the log stream after
2 tries"
+ )
+
+ @mock.patch.object(EcsBaseOperator, "client")
+ def test_container_name_not_polled(self, client_mock):
+ self.set_up_operator(
+ awslogs_group=None,
+ awslogs_stream_prefix=None,
+ container_name=None,
+ )
+ client_mock.run_task.return_value = RESPONSE_WITHOUT_NAME
+ self.ecs._start_task()
+ assert client_mock.describe_tasks.call_count == 0
+
class TestEcsCreateClusterOperator(EcsBaseTestCase):
@pytest.mark.parametrize("waiter_delay, waiter_max_attempts",
WAITERS_TEST_CASES)