[ https://issues.apache.org/jira/browse/AIRFLOW-3046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Holden Karau's magical unicorn reassigned AIRFLOW-3046: ------------------------------------------------------- Assignee: Holden Karau's magical unicorn > ECS Operator mistakenly reports success when task is killed due to EC2 host > termination > --------------------------------------------------------------------------------------- > > Key: AIRFLOW-3046 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3046 > Project: Apache Airflow > Issue Type: Bug > Components: contrib, operators > Reporter: Dan MacTough > Assignee: Holden Karau's magical unicorn > Priority: Major > > We have ECS clusters made up of EC2 spot fleets. Among other things, this > means hosts can be terminated on short notice. When this happens, all tasks > (and associated containers) get terminated, as well. > We expect that when that happens for Airflow task instances using the ECS > Operator, those instances will be marked as failures and retried. > Instead, they are marked as successful. > As a result, the immediate downstream task fails, causing the scheduled DAG > run to fail. > Here's an example of the Airflow log output when this happens: > {noformat} > [2018-09-12 01:02:02,712] {ecs_operator.py:112} INFO - ECS Task stopped, > check status: {'tasks': [{'taskArn': > 'arn:aws:ecs:us-east-1:111111111111:task/32d43a1d-fbc7-4659-815d-9133bde11cdc', > 'clusterArn': 'arn:aws:ecs:us-east-1:111111111111:cluster/processing', > 'taskDefinitionArn': > 'arn:aws:ecs:us-east-1:111111111111:task-definition/foobar-testing_dataEngineering_rd:76', > 'containerInstanceArn': > 'arn:aws:ecs:us-east-1:111111111111:container-instance/7431f0a6-8fc5-4eff-8196-32f77d286a61', > 'overrides': {'containerOverrides': [{'name': 'foobar-testing', 'command': > ['./bin/generate-features.sh', '2018-09-11']}]}, 'lastStatus': 'STOPPED', > 'desiredStatus': 'STOPPED', 'cpu': '4096', 'memory': '60000', 'containers': > [{'containerArn': > 'arn:aws:ecs:us-east-1:111111111111:container/0d5cc553-f894-4f9a-b17c-9f80f7ce8d0a', > 'taskArn': > 'arn:aws:ecs:us-east-1:111111111111:task/32d43a1d-fbc7-4659-815d-9133bde11cdc', > 'name': 'foobar-testing', 'lastStatus': 'RUNNING', 'networkBindings': [], > 'networkInterfaces': [], 'healthStatus': 'UNKNOWN'}], 'startedBy': 'Airflow', > 'version': 3, 'stoppedReason': 'Host EC2 (instance i-02cf23bbd5ae26194) > terminated.', 'connectivity': 'CONNECTED', 'connectivityAt': > datetime.datetime(2018, 9, 12, 0, 6, 30, 245000, tzinfo=tzlocal()), > 'pullStartedAt': datetime.datetime(2018, 9, 12, 0, 6, 32, 748000, > tzinfo=tzlocal()), 'pullStoppedAt': datetime.datetime(2018, 9, 12, 0, 6, 59, > 748000, tzinfo=tzlocal()), 'createdAt': datetime.datetime(2018, 9, 12, 0, 6, > 30, 245000, tzinfo=tzlocal()), 'startedAt': datetime.datetime(2018, 9, 12, 0, > 7, 0, 748000, tzinfo=tzlocal()), 'stoppingAt': datetime.datetime(2018, 9, 12, > 1, 2, 0, 91000, tzinfo=tzlocal()), 'stoppedAt': datetime.datetime(2018, 9, > 12, 1, 2, 0, 91000, tzinfo=tzlocal()), 'group': > 'family:foobar-testing_dataEngineering_rd', 'launchType': 'EC2', > 'attachments': [], 'healthStatus': 'UNKNOWN'}], 'failures': [], > 'ResponseMetadata': {'RequestId': '758c791f-b627-11e8-83f7-2b76f4796ed2', > 'HTTPStatusCode': 200, 'HTTPHeaders': {'server': 'Server', 'date': 'Wed, 12 > Sep 2018 01:02:02 GMT', 'content-type': 'application/x-amz-json-1.1', > 'content-length': '1412', 'connection': 'keep-alive', 'x-amzn-requestid': > '758c791f-b627-11e8-83f7-2b76f4796ed2'}, 'RetryAttempts': 0}}{noformat} > I believe the function that checks whether the task is successful needs at > least one more check. > We are currently running a modified version of the ECS Operator that contains > the following {{_check_success_task}} function to address this failure > condition: > {code} > def _check_success_task(self): > response = self.client.describe_tasks( > cluster=self.cluster, > tasks=[self.arn] > ) > self.log.info('ECS Task stopped, check status: %s', response) > if len(response.get('failures', [])) > 0: > raise AirflowException(response) > for task in response['tasks']: > if 'terminated' in task.get('stoppedReason', '').lower(): > raise AirflowException('The task was stopped because the host > instance terminated: {}'.format( > task.get('stoppedReason', ''))) > containers = task['containers'] > for container in containers: > if container.get('lastStatus') == 'STOPPED' and \ > container['exitCode'] != 0: > raise AirflowException( > 'This task is not in success state {}'.format(task)) > elif container.get('lastStatus') == 'PENDING': > raise AirflowException( > 'This task is still pending {}'.format(task)) > elif 'error' in container.get('reason', '').lower(): > raise AirflowException( > 'This containers encounter an error during launching > : {}'. > format(container.get('reason', '').lower())) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)