yuqian90 commented on issue #10790:
URL: https://github.com/apache/airflow/issues/10790#issuecomment-690136389


   Hi @turbaszek, any finding on this? We have a CeleryExecutor + Redis setup 
with three workers (apache-airflow 1.10.12). The airflow-scheduler log has a 
lot of lines like this. I remember this was already a problem when we were 
using older versions such as 1.10.10. It's just we never paid much attention to 
it.
   
   ```
   {taskinstance.py:1150} ERROR - Executor reports task instance <TaskInstance: 
... [queued]> finished (success) although the task says its queued. Was the 
task killed externally?
   ```
   
   Same with others in this thread, we have a lot of sensors in "reschedule" 
mode with `poke_interval` set to 60s. These are the ones that most often hit 
this error. So far our workaround has been to add a `retries=3` to these 
sensors. That way when this error happens it retries and we don't get any spam. 
This is definitely not a great long term solution though. Such sensors go into 
`up_for_retry` state when this happen.
   
   I also tried to tweak these parameters. They don't seem to matter much as 
far as this error is concerned:
   
   ```
   parallelism = 1024
   dag_concurrency = 128
   max_threads = 8
   ```
   
   The way to reproduce this issue seems to be to create a DAG with a bunch of 
parallel `reschedule` sensors. And make the DAG slow to import. For example, 
like this. If we add a `time.sleep(30)` at the end to simulate the experience 
of slow-to-import DAGs, this error happens a lot for such sensors. You may also 
need to tweak the `dagbag_import_timeout` and `dag_file_processor_timeout` if 
adding the `sleep` causes dags to fail to import altogether.
   
   
   When the scheduler starts to process this DAG, we then start to see the 
above error happening to these sensors. And the go into `up_for_retry`.
   
   ```python
   import datetime
   import pendulum
   import time
   
   from airflow.models.dag import DAG
   from airflow.contrib.sensors.python_sensor import PythonSensor
   
   with DAG(
       dag_id="test_dag_slow",
       start_date=datetime.datetime(2020, 9, 8),
       schedule_interval="@daily",
   ) as dag:
       sensors = [
           PythonSensor(
               task_id=f"sensor_{i}",
               python_callable=lambda: False,
               mode="reschedule",
               retries=2,
           ) for i in range(20)
       ]
       time.sleep(30)
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to