This is an automated email from the ASF dual-hosted git repository. ash pushed a commit to branch v2-0-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 0fc6ca33fd698c123875454b64299b9af4dd4877 Author: Kaxil Naik <[email protected]> AuthorDate: Fri Mar 5 19:34:17 2021 +0000 Default to Celery Task model when backend model does not exist (#14612) closes https://github.com/apache/airflow/issues/14586 We add this feature in https://github.com/apache/airflow/pull/12336 but looks like for some users this attribute does not exist. I am not sure if they are using a different Celery DB Backend or not but even Celery > 5 contains this attribute (https://github.com/celery/celery/blob/v5.0.5/celery/backends/database/__init__.py#L66) and even Celery 4 but this commits use the Celery Task model when an attribute error occurs (cherry picked from commit 33910d6c699b5528db4be40d31199626dafed912) --- airflow/executors/celery_executor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py index 8bbaed1..a670294 100644 --- a/airflow/executors/celery_executor.py +++ b/airflow/executors/celery_executor.py @@ -35,7 +35,7 @@ from typing import Any, Dict, List, Mapping, MutableMapping, Optional, Set, Tupl from celery import Celery, Task, states as celery_states from celery.backends.base import BaseKeyValueStoreBackend -from celery.backends.database import DatabaseBackend, session_cleanup +from celery.backends.database import DatabaseBackend, Task as TaskDb, session_cleanup from celery.result import AsyncResult from celery.signals import import_modules as celery_import_modules from setproctitle import setproctitle # pylint: disable=no-name-in-module @@ -567,7 +567,7 @@ class BulkStateFetcher(LoggingMixin): def _get_many_from_db_backend(self, async_tasks) -> Mapping[str, EventBufferValueType]: task_ids = _tasks_list_to_task_ids(async_tasks) session = app.backend.ResultSession() - task_cls = app.backend.task_cls + task_cls = getattr(app.backend, "task_cls", TaskDb) with session_cleanup(session): tasks = session.query(task_cls).filter(task_cls.task_id.in_(task_ids)).all()
