[ 
https://issues.apache.org/jira/browse/AIRFLOW-3046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Holden Karau's magical unicorn reassigned AIRFLOW-3046:
-------------------------------------------------------

    Assignee: Holden Karau's magical unicorn

> ECS Operator mistakenly reports success when task is killed due to EC2 host 
> termination
> ---------------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-3046
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3046
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: contrib, operators
>            Reporter: Dan MacTough
>            Assignee: Holden Karau's magical unicorn
>            Priority: Major
>
> We have ECS clusters made up of EC2 spot fleets. Among other things, this 
> means hosts can be terminated on short notice. When this happens, all tasks 
> (and associated containers) get terminated, as well.
> We expect that when that happens for Airflow task instances using the ECS 
> Operator, those instances will be marked as failures and retried.
> Instead, they are marked as successful.
> As a result, the immediate downstream task fails, causing the scheduled DAG 
> run to fail.
> Here's an example of the Airflow log output when this happens:
> {noformat}
> [2018-09-12 01:02:02,712] {ecs_operator.py:112} INFO - ECS Task stopped, 
> check status: {'tasks': [{'taskArn': 
> 'arn:aws:ecs:us-east-1:111111111111:task/32d43a1d-fbc7-4659-815d-9133bde11cdc',
>  'clusterArn': 'arn:aws:ecs:us-east-1:111111111111:cluster/processing', 
> 'taskDefinitionArn': 
> 'arn:aws:ecs:us-east-1:111111111111:task-definition/foobar-testing_dataEngineering_rd:76',
>  'containerInstanceArn': 
> 'arn:aws:ecs:us-east-1:111111111111:container-instance/7431f0a6-8fc5-4eff-8196-32f77d286a61',
>  'overrides': {'containerOverrides': [{'name': 'foobar-testing', 'command': 
> ['./bin/generate-features.sh', '2018-09-11']}]}, 'lastStatus': 'STOPPED', 
> 'desiredStatus': 'STOPPED', 'cpu': '4096', 'memory': '60000', 'containers': 
> [{'containerArn': 
> 'arn:aws:ecs:us-east-1:111111111111:container/0d5cc553-f894-4f9a-b17c-9f80f7ce8d0a',
>  'taskArn': 
> 'arn:aws:ecs:us-east-1:111111111111:task/32d43a1d-fbc7-4659-815d-9133bde11cdc',
>  'name': 'foobar-testing', 'lastStatus': 'RUNNING', 'networkBindings': [], 
> 'networkInterfaces': [], 'healthStatus': 'UNKNOWN'}], 'startedBy': 'Airflow', 
> 'version': 3, 'stoppedReason': 'Host EC2 (instance i-02cf23bbd5ae26194) 
> terminated.', 'connectivity': 'CONNECTED', 'connectivityAt': 
> datetime.datetime(2018, 9, 12, 0, 6, 30, 245000, tzinfo=tzlocal()), 
> 'pullStartedAt': datetime.datetime(2018, 9, 12, 0, 6, 32, 748000, 
> tzinfo=tzlocal()), 'pullStoppedAt': datetime.datetime(2018, 9, 12, 0, 6, 59, 
> 748000, tzinfo=tzlocal()), 'createdAt': datetime.datetime(2018, 9, 12, 0, 6, 
> 30, 245000, tzinfo=tzlocal()), 'startedAt': datetime.datetime(2018, 9, 12, 0, 
> 7, 0, 748000, tzinfo=tzlocal()), 'stoppingAt': datetime.datetime(2018, 9, 12, 
> 1, 2, 0, 91000, tzinfo=tzlocal()), 'stoppedAt': datetime.datetime(2018, 9, 
> 12, 1, 2, 0, 91000, tzinfo=tzlocal()), 'group': 
> 'family:foobar-testing_dataEngineering_rd', 'launchType': 'EC2', 
> 'attachments': [], 'healthStatus': 'UNKNOWN'}], 'failures': [], 
> 'ResponseMetadata': {'RequestId': '758c791f-b627-11e8-83f7-2b76f4796ed2', 
> 'HTTPStatusCode': 200, 'HTTPHeaders': {'server': 'Server', 'date': 'Wed, 12 
> Sep 2018 01:02:02 GMT', 'content-type': 'application/x-amz-json-1.1', 
> 'content-length': '1412', 'connection': 'keep-alive', 'x-amzn-requestid': 
> '758c791f-b627-11e8-83f7-2b76f4796ed2'}, 'RetryAttempts': 0}}{noformat}
> I believe the function that checks whether the task is successful needs at 
> least one more check. 
> We are currently running a modified version of the ECS Operator that contains 
> the following {{_check_success_task}} function to address this failure 
> condition:
> {code}
>     def _check_success_task(self):
>         response = self.client.describe_tasks(
>             cluster=self.cluster,
>             tasks=[self.arn]
>         )
>         self.log.info('ECS Task stopped, check status: %s', response)
>         if len(response.get('failures', [])) > 0:
>             raise AirflowException(response)
>         for task in response['tasks']:
>             if 'terminated' in task.get('stoppedReason', '').lower():
>                 raise AirflowException('The task was stopped because the host 
> instance terminated: {}'.format(
>                     task.get('stoppedReason', '')))
>             containers = task['containers']
>             for container in containers:
>                 if container.get('lastStatus') == 'STOPPED' and \
>                         container['exitCode'] != 0:
>                     raise AirflowException(
>                         'This task is not in success state {}'.format(task))
>                 elif container.get('lastStatus') == 'PENDING':
>                     raise AirflowException(
>                         'This task is still pending {}'.format(task))
>                 elif 'error' in container.get('reason', '').lower():
>                     raise AirflowException(
>                         'This containers encounter an error during launching 
> : {}'.
>                         format(container.get('reason', '').lower()))
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to