yuqian90 commented on issue #10790: URL: https://github.com/apache/airflow/issues/10790#issuecomment-690136389
Hi @turbaszek, any finding on this? We have a CeleryExecutor + Redis setup with three workers (apache-airflow 1.10.12). The airflow-scheduler log has a lot of lines like this. I remember this was already a problem when we were using older versions such as 1.10.10. It's just we never paid much attention to it. ``` {taskinstance.py:1150} ERROR - Executor reports task instance <TaskInstance: ... [queued]> finished (success) although the task says its queued. Was the task killed externally? ``` Same with others in this thread, we have a lot of sensors in "reschedule" mode with `poke_interval` set to 60s. These are the ones that most often hit this error. So far our workaround has been to add a `retries=3` to these sensors. That way when this error happens it retries and we don't get any spam. This is definitely not a great long term solution though. Such sensors go into `up_for_retry` state when this happen. I also tried to tweak these parameters. They don't seem to matter much as far as this error is concerned: ``` parallelism = 1024 dag_concurrency = 128 max_threads = 8 ``` The way to reproduce this issue seems to be to create a DAG with a bunch of parallel `reschedule` sensors. And make the DAG slow to import. For example, like this. If we add a `time.sleep(30)` at the end to simulate the experience of slow-to-import DAGs, this error happens a lot for such sensors. You may also need to tweak the `dagbag_import_timeout` and `dag_file_processor_timeout` if adding the `sleep` causes dags to fail to import altogether. When the scheduler starts to process this DAG, we then start to see the above error happening to these sensors. And the go into `up_for_retry`. ```python import datetime import pendulum import time from airflow.models.dag import DAG from airflow.contrib.sensors.python_sensor import PythonSensor with DAG( dag_id="test_dag_slow", start_date=datetime.datetime(2020, 9, 8), schedule_interval="@daily", ) as dag: sensors = [ PythonSensor( task_id=f"sensor_{i}", python_callable=lambda: False, mode="reschedule", retries=2, ) for i in range(20) ] time.sleep(30) ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org