[ https://issues.apache.org/jira/browse/AIRFLOW-6527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kaxil Naik updated AIRFLOW-6527: -------------------------------- Fix Version/s: 1.10.8 > Error sending Celery task:Timeout in send_task_to_executor > ---------------------------------------------------------- > > Key: AIRFLOW-6527 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6527 > Project: Apache Airflow > Issue Type: Bug > Components: scheduler > Affects Versions: 1.10.7 > Reporter: Qian Yu > Priority: Major > Fix For: 2.0.0, 1.10.8 > > > We use Airflow with CeleryExecutor and redis broker. Our airflow scheduler > often encounters this \{{AirflowTaskTimeout}} error. > - This happens in \{{send_task_to_executor()}}. > - It only happens occasionally. > - Retrying the failed task a few times always works. > - This affects at least 1.10.6 and 1.10.7 and possibly other versions too. > Possible cause: > Our airflow venv and dags_folder are on an NFS mount because we want to keep > the various pieces of Airflow services in sync. > The NFS mount can be slow sometimes. This causes the import to be slow and > causes \{{send_task_to_executor()}} to take more than 2 seconds. > Other people with similar looking problems: > The following issue is now closed. It's not clear to me whether or how the > user resolved this issue. > https://github.com/bitnami/bitnami-docker-airflow-scheduler/issues/1 > Another user asked a question in the mailing list. It's not answered. > https://www.mail-archive.com/dev@airflow.apache.org/msg01093.html > Proposed workaround: > - Make this `timeout(seconds=2)` configurable. E.g adding a > [celery]send_task_timeout to airflow.cfg. Since 2 seconds seems too short, we > can configure it to something like 15 seconds to make it much less likely to > happen. > - Move airflow venv to the local disk. This makes it inconvenient to sync > airflow installation across multiple hosts though. > {code} > Jan 09 22:46:59 scheduler_host airflow[18882]: [2020-01-09 22:46:59,763] > \{celery_executor.py:224} ERROR - Error sending Celery task:Timeout, PID: > 27724 > Jan 09 22:46:59 scheduler_host airflow[18882]: Celery Task ID: > ('example_daily', 'example_sensor1', datetime.datetime(2020, 1, 9, 0, 0, > tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) > Jan 09 22:46:59 scheduler_host airflow[18882]: Traceback (most recent call > last): > Jan 09 22:46:59 scheduler_host airflow[18882]: File > "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/kombu/utils/objects.py", > line 42, in __get__ > Jan 09 22:46:59 scheduler_host airflow[18882]: return > obj.__dict__[self.__name__] > Jan 09 22:46:59 scheduler_host airflow[18882]: KeyError: 'amqp' > Jan 09 22:46:59 scheduler_host airflow[18882]: During handling of the above > exception, another exception occurred: > Jan 09 22:46:59 scheduler_host airflow[18882]: Traceback (most recent call > last): > Jan 09 22:46:59 scheduler_host airflow[18882]: File > "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/airflow/executors/celery_executor.py", > line 118, in send_task_to_executor > Jan 09 22:46:59 scheduler_host airflow[18882]: result = > task.apply_async(args=[command], queue=queue) > Jan 09 22:46:59 scheduler_host airflow[18882]: File > "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/celery/app/task.py", line > 570, in apply_async > Jan 09 22:46:59 scheduler_host airflow[18882]: **options > Jan 09 22:46:59 scheduler_host airflow[18882]: File > "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/celery/app/base.py", line > 712, in send_task > Jan 09 22:46:59 scheduler_host airflow[18882]: amqp = self.amqp > Jan 09 22:46:59 scheduler_host airflow[18882]: File > "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/kombu/utils/objects.py", > line 44, in __get__ > Jan 09 22:46:59 scheduler_host airflow[18882]: value = > obj.__dict__[self.__name__] = self.__get(obj) > Jan 09 22:46:59 scheduler_host airflow[18882]: File > "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/celery/app/base.py", line > 1202, in amqp > Jan 09 22:46:59 scheduler_host airflow[18882]: return > instantiate(self.amqp_cls, app=self) > Jan 09 22:46:59 scheduler_host airflow[18882]: File > "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/celery/utils/imports.py", > line 55, in instantiate > Jan 09 22:46:59 scheduler_host airflow[18882]: return > symbol_by_name(name)(*args, **kwargs) > Jan 09 22:46:59 scheduler_host airflow[18882]: File > "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/kombu/utils/imports.py", > line 57, in symbol_by_name > Jan 09 22:46:59 scheduler_host airflow[18882]: module = imp(module_name, > package=package, **kwargs) > Jan 09 22:46:59 scheduler_host airflow[18882]: File > "/usr/lib/python3.6/importlib/__init__.py", line 126, in import_module > Jan 09 22:46:59 scheduler_host airflow[18882]: return > _bootstrap._gcd_import(name[level:], package, level) > Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen > importlib._bootstrap>", line 994, in _gcd_import > Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen > importlib._bootstrap>", line 971, in _find_and_load > Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen > importlib._bootstrap>", line 955, in _find_and_load_unlocked > Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen > importlib._bootstrap>", line 665, in _load_unlocked > Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen > importlib._bootstrap_external>", line 678, in exec_module > Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen > importlib._bootstrap>", line 219, in _call_with_frames_removed > Jan 09 22:46:59 scheduler_host airflow[18882]: File > "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/celery/app/amqp.py", line > 23, in <module> > Jan 09 22:46:59 scheduler_host airflow[18882]: from . import routes as _routes > Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen > importlib._bootstrap>", line 971, in _find_and_load > Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen > importlib._bootstrap>", line 951, in _find_and_load_unlocked > Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen > importlib._bootstrap>", line 894, in _find_spec > Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen > importlib._bootstrap_external>", line 1157, in find_spec > Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen > importlib._bootstrap_external>", line 1129, in _get_spec > Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen > importlib._bootstrap_external>", line 1271, in find_spec > Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen > importlib._bootstrap_external>", line 96, in _path_isfile > Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen > importlib._bootstrap_external>", line 88, in _path_is_mode_type > Jan 09 22:46:59 scheduler_host airflow[18882]: File "<frozen > importlib._bootstrap_external>", line 82, in _path_stat > Jan 09 22:46:59 scheduler_host airflow[18882]: File > "/mnt/nfs1/airflow_venv/lib/python3.6/site-packages/airflow/utils/timeout.py", > line 43, in handle_timeout > Jan 09 22:46:59 scheduler_host airflow[18882]: raise > AirflowTaskTimeout(self.error_message) > Jan 09 22:46:59 scheduler_host airflow[18882]: > airflow.exceptions.AirflowTaskTimeout: Timeout, PID: 27724 > Jan 09 22:46:59 scheduler_host airflow[18882]: [2020-01-09 22:46:59,764] > \{celery_executor.py:224} ERROR - Error sending Celery task:Timeout, PID: > 27725 > {code} > This is the code that causes this. The timeout(seconds=2) is hardcoded: > {code:python} > def send_task_to_executor(task_tuple): > key, simple_ti, command, queue, task = task_tuple > try: > with timeout(seconds=2): > result = task.apply_async(args=[command], queue=queue) > except Exception as e: > exception_traceback = "Celery Task ID: {}\n{}".format(key, > > traceback.format_exc()) > result = ExceptionWithTraceback(e, exception_traceback) > return key, command, result > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)