o-nikolas commented on a change in pull request #16685:
URL: https://github.com/apache/airflow/pull/16685#discussion_r664939787
##########
File path: airflow/providers/amazon/aws/operators/ecs.py
##########
@@ -264,6 +276,21 @@ def _start_task(self):
self.log.info('ECS Task started: %s', response)
self.arn = response['tasks'][0]['taskArn']
+ ecs_task_id = self.arn.split("/")[-1]
+ self.log.info(f"ECS task ID is: {ecs_task_id}")
+
+ if self.reattach:
+ # Save the task ARN in XCom to be able to reattach it if needed
+ self._xcom_set(context, key="ecs_task_arn", value=self.arn,
task_id=f"{self.task_id}_task_arn")
+
+ def _xcom_set(self, context, key, value, task_id):
Review comment:
This just contains a single line and is only used once, do we need the
helper? Or did you use this mostly to make patching in your testing easier
(which is completely valid IMHO)?
##########
File path: airflow/providers/amazon/aws/operators/ecs.py
##########
@@ -264,6 +276,21 @@ def _start_task(self):
self.log.info('ECS Task started: %s', response)
self.arn = response['tasks'][0]['taskArn']
+ ecs_task_id = self.arn.split("/")[-1]
+ self.log.info(f"ECS task ID is: {ecs_task_id}")
+
+ if self.reattach:
+ # Save the task ARN in XCom to be able to reattach it if needed
+ self._xcom_set(context, key="ecs_task_arn", value=self.arn,
task_id=f"{self.task_id}_task_arn")
Review comment:
Minor nit: the magic string `"ecs_task_arn"` is used in a couple places.
Perhaps put this in a constant. You could also use a templated string constant
for the task_id
##########
File path: airflow/providers/amazon/aws/operators/ecs.py
##########
@@ -274,15 +301,14 @@ def _try_reattach_task(self):
)
running_tasks = list_tasks_resp['taskArns']
- running_tasks_count = len(running_tasks)
- if running_tasks_count > 1:
- self.arn = running_tasks[0]
- self.log.warning('More than 1 ECS Task found. Reattaching to %s',
self.arn)
- elif running_tasks_count == 1:
- self.arn = running_tasks[0]
- self.log.info('Reattaching task: %s', self.arn)
+ # Check if the ECS task previously launched is already running
+ previous_task_arn =
self.xcom_pull(task_ids=f"{self.task_id}_task_arn", key="ecs_task_arn")
+ self.log.info(f"Previously launched task = {previous_task_arn}")
Review comment:
Nit: It probably adds noise to log this if we're not going to use it. I
think the log line inside the if statement below is sufficient.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]