argibbs opened a new issue, #15596:
URL: https://github.com/apache/airflow/issues/15596

   **Apache Airflow version**: 2.0.1 and 2.0.2
   
   **Kubernetes version (if you are using kubernetes)** (use `kubectl 
version`): N/A
   
   **Environment**: Celery executors, Redis + Postgres
   
   - **Cloud provider or hardware configuration**: Running inside docker
   - **OS** (e.g. from /etc/os-release): Centos (inside Docker)
   
   **What happens**:
   
   In 2.0.0 if you delete a dag from the GUI when the `.py` file is still 
present, the dag is re-added within a few seconds (albeit with no history etc. 
etc.). Upon attempting to upgrade to 2.0.1 we found that after deleting a dag 
it would take tens of minutes to come back (or more!), and its reappearance was 
seemingly at random (i.e. restarting schedulers / guis did not help).
   
   It did not seem to matter which dag it was.
   
   The problem still exists in 2.0.2.
   
   **What you expected to happen**:
   
   Deleting a dag should result in that dag being re-added in short order if 
the `.py` file is still present.
   
   **Likely cause**
   
   I've tracked it back to an issue with SLA callbacks. I strongly suspect the 
fix for Issue #14050 was inadvertently responsible, since that was in the 2.0.1 
release. In a nutshell, it appears the dag_processor_manager gets into a state 
where on every single pass it takes so long to process SLA checks for one of 
the dag files that the entire processor times out and is killed. As a result, 
some of the dag files (that are queued behind the poison pill file) never get 
processed and thus we don't reinstate the deleted dag unless the system gets 
quiet and the SLA checks clear down.
   
   To reproduce in _my_ setup, I created a clean airflow instance. The only 
materially important config setting I use is 
`AIRFLOW__SCHEDULER__PARSING_PROCESSES=1` which helps keep things deterministic.
   
   I then started adding in dag files from the production system until I found 
a file that caused the problem. Most of our dags do not have SLAs, but this one 
did. After adding it, I started seeing lines like this in 
`dag_processor_manager.log` (file names have been changed to keep things simple)
   
   ```
   [2021-04-29 16:27:19,259] {dag_processing.py:1129} ERROR - Processor for 
/home/airflow/dags/problematic.py with PID 309 started at 
2021-04-29T16:24:19.073027+00:00 has timed out, killing it.
   ```
   Additionally, the stats contained lines like:
   ```
   File Path                                                            PID  
Runtime      # DAGs    # Errors  Last Runtime    Last Run
   -----------------------------------------------------------------  -----  
---------  --------  ----------  --------------  -------------------
   /home/airflow/dags/problematic.py                                    309  
167.29s           8           0  158.78s         2021-04-29T16:24:19
   ```
   (i.e. 3 minutes to process a single file!)
   
   Of note, the parse time of the affected file got longer on each pass until 
the processor was killed. Increasing 
`AIRFLOW__CORE__DAG_FILE_PROCESSOR_TIMEOUT` to e.g. 300 did nothing to help; it 
simply bought a few more iterations of the parse loop before it blew up.
   
   Browsing the log file for `scheduler/2021-04-29/problematic.py.log` I could 
see the following:
   
   <details><summary>Log file entries in 2.0.2</summary> 
   
   ```
   [2021-04-29 16:06:44,633] {scheduler_job.py:629} INFO - Processing file 
/home/airflow/dags/problematic.py for tasks to queue
   [2021-04-29 16:06:44,634] {logging_mixin.py:104} INFO - [2021-04-29 
16:06:44,634] {dagbag.py:451} INFO - Filling up the DagBag from 
/home/airflow/dags/problematic
   [2021-04-29 16:06:45,001] {scheduler_job.py:639} INFO - DAG(s) 
dict_keys(['PARQUET-BASIC-DATA-PIPELINE-YESTERDAY-S1-weekends', 
'PARQUET-BASIC-DATA-PIPELINE-YESTERDAY-APPEND-S1-weekends', 
'PARQUET-BASIC-DATA-PIPELINE-TODAY-S1-weekends', 
'PARQUET-BASIC-DATA-PIPELINE-TODAY-APPEND-S1-weekends', 
'PARQUET-BASIC-DATA-PIPELINE-TODAY-S2-weekends', 
'PARQUET-BASIC-DATA-PIPELINE-YESTERDAY-S2-weekends', 
'PARQUET-BASIC-DATA-PIPELINE-TODAY-S3-weekends', 
'PARQUET-BASIC-DATA-PIPELINE-YESTERDAY-S3-weekends']) retrieved from 
/home/airflow/dags/problematic.py
   [2021-04-29 16:06:45,001] {scheduler_job.py:396} INFO - Running SLA Checks 
for PARQUET-BASIC-DATA-PIPELINE-YESTERDAY-APPEND-S1-weekends
   [2021-04-29 16:06:46,398] {scheduler_job.py:396} INFO - Running SLA Checks 
for PARQUET-BASIC-DATA-PIPELINE-YESTERDAY-APPEND-S1-weekends
   [2021-04-29 16:06:47,615] {scheduler_job.py:396} INFO - Running SLA Checks 
for PARQUET-BASIC-DATA-PIPELINE-YESTERDAY-APPEND-S1-weekends
   [2021-04-29 16:06:48,852] {scheduler_job.py:396} INFO - Running SLA Checks 
for PARQUET-BASIC-DATA-PIPELINE-TODAY-APPEND-S1-weekends
   [2021-04-29 16:06:49,411] {scheduler_job.py:396} INFO - Running SLA Checks 
for PARQUET-BASIC-DATA-PIPELINE-TODAY-APPEND-S1-weekends
   [2021-04-29 16:06:50,156] {scheduler_job.py:396} INFO - Running SLA Checks 
for PARQUET-BASIC-DATA-PIPELINE-TODAY-APPEND-S1-weekends
   [2021-04-29 16:06:50,845] {scheduler_job.py:396} INFO - Running SLA Checks 
for PARQUET-BASIC-DATA-PIPELINE-YESTERDAY-APPEND-SP500_Index_1-weekends
   [2021-04-29 16:06:52,164] {scheduler_job.py:396} INFO - Running SLA Checks 
for PARQUET-BASIC-DATA-PIPELINE-YESTERDAY-APPEND-S1-weekends
   [2021-04-29 16:06:53,474] {scheduler_job.py:396} INFO - Running SLA Checks 
for PARQUET-BASIC-DATA-PIPELINE-YESTERDAY-APPEND-S1-weekends
   [2021-04-29 16:06:54,731] {scheduler_job.py:396} INFO - Running SLA Checks 
for PARQUET-BASIC-DATA-PIPELINE-TODAY-APPEND-SP500_Index_1-weekends
   [2021-04-29 16:06:55,345] {scheduler_job.py:396} INFO - Running SLA Checks 
for PARQUET-BASIC-DATA-PIPELINE-TODAY-APPEND-S1-weekends
   [2021-04-29 16:06:55,920] {scheduler_job.py:396} INFO - Running SLA Checks 
for PARQUET-BASIC-DATA-PIPELINE-TODAY-APPEND-S1-weekends
   
   and so on for 100+ more lines like this... 
   ```
   
   </details>
   
   Two important points: from the above logs:
   1. We seem to be running checks on the same dags multiple times
   2. The number of checks grows on each pass (i.e. the number of log lines 
beginning "Running SLA Checks..." increases on each pass until the processor 
manager is restarted, and then it begins afresh)
   
   **Likely location of the problem**:
   
   This is where I start to run out of steam. I believe the culprit is this 
line: 
https://github.com/apache/airflow/blob/2.0.2/airflow/jobs/scheduler_job.py#L1813
   
   It seems to me that the above leads to a feedback where each time you send a 
dag callback to the processor you include a free SLA callback as well, hence 
the steadily growing SLA processing log messages / behaviour I observed. As 
noted above, this method call _was_ in 2.0.0 but until Issue #14050 was fixed, 
the SLAs were ignored, so the problem only kicked in from 2.0.1 onwards.
   
   Unfortunately, my airflow-fu is not good enough for me to suggest a fix 
beyond the Gordian solution of removing the line completely (!); in particular, 
it's not clear to me how / where SLAs _should_ be being checked. Should the 
dag_processor_manager be doing them? Should it be another component (I mean, 
naively, I would have thought it should be the workers, so that SLA checks can 
scale with the rest of your system)? How should the checks be enqueued? I dunno 
enough to give a good answer. 🤷 
   
   **How to reproduce it**:
   
   In our production system, it would blow up every time, immediately. 
_Reliably_ reproducing in a clean system depends on how fast your test system 
is; the trick appears to be getting the scan of the dag file to take long 
enough that the SLA checks start to snowball. The dag below did it for me; if 
your machine seems to be staying on top of processing the dags, try increasing 
the number of tasks in a single dag (or buy a slower computer!)
   
   <details><summary>Simple dag that causes the problem</summary>
   
   ```
   import datetime as dt
   import pendulum
   
   from airflow import DAG
   from airflow.operators.bash import BashOperator
   
   
   def create_graph(dag):
       prev_task = None
       for i in range(10):
           next_task = BashOperator(
               task_id=f'simple_task_{i}',
               bash_command="echo SLA issue",
               dag=dag)
   
           if prev_task:
               prev_task >> next_task
   
           prev_task = next_task
   
   
   def create_dag(name: str) -> DAG:
       tz_to_use = pendulum.timezone('UTC')
   
       default_args = {
           'owner': 'your.email.h...@email.co',
           'start_date': dt.datetime(2018, 11, 13, tzinfo=tz_to_use),
           'email': ['sla_f...@some.domain.or.other'],
           'email_on_failure': False,
           'email_on_retry': False,
           'sla': dt.timedelta(hours=13),
       }
   
       dag = DAG(name,
                 catchup=False,
                 default_args=default_args,
                 max_active_runs=10,
                 schedule_interval="* * * * *")
   
       create_graph(dag)
   
       return dag
   
   for i in range(100):
     name = f"sla_dag_{i}"
     globals()[name] = create_dag(name)
   ```
   
   </details>
   
   To reproduce:
   1. Configure an empty airflow instance, s.t. it only has one parsing process 
(as per config above).
   2. Add the file above into the install. The file simply creates 100 
near-trivial dags. On my system, airflow can't stay ahead, and is basically 
permanently busy processing the backlog. Your cpu may have more hamsters, in 
which case you'll need to up the number of tasks and/or dags.
   2. Locate and tail the `scheduler/[date]/sla_example.py.log` file (assuming 
you called the above `sla_example.py`, of course)
   3. This is the non-deterministic part. On my system, within a few minutes, 
the processor manager is taking noticeably longer to process the file and you 
should be able to see lots of SLA log messages like my example above ☝️. Like 
all good exponential growth it takes many iterations to go from 1 second to 1.5 
seconds to 2 seconds, but not very long at all to go from 10 seconds to 30 to 💥 
   
   **Anything else we need to know**:
   
   1. I'm working around this for now by simply removing the SLAs from the dag. 
This solves the problem since the SLA callbacks are then dropped. But SLAs are 
a great feature, and I'd like them back (please!).
   2. Thanks for making airflow and thanks for making it this far down the 
report!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to