Lioscro opened a new issue, #38599:
URL: https://github.com/apache/airflow/issues/38599

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### If "Other Airflow 2 version" selected, which one?
   
   2.6.3
   
   ### What happened?
   
   Our Airflow deployment runs 4 triggerer instances. Under high load, we would 
often see triggerer containers sporadically crashing (and being restarted by 
our container orchestration system) with the following error.
   
   ```
   Traceback (most recent call last):
   3/9/2024 5:32:22 PM  File 
"/opt/pyenv/versions/3.11.7/lib/python3.11/threading.py", line 1045, in 
_bootstrap_inner
   3/9/2024 5:32:22 PM    self.run()
   3/9/2024 5:32:22 PM  File 
"/opt/pyenv/versions/3.11.7/lib/python3.11/site-packages/sentry_sdk/integrations/threading.py",
 line 72, in run
   3/9/2024 5:32:22 PM    reraise(*_capture_exception())
   3/9/2024 5:32:22 PM  File 
"/opt/pyenv/versions/3.11.7/lib/python3.11/site-packages/sentry_sdk/_compat.py",
 line 60, in reraise
   3/9/2024 5:32:22 PM    raise value
   3/9/2024 5:32:22 PM  File 
"/opt/pyenv/versions/3.11.7/lib/python3.11/site-packages/sentry_sdk/integrations/threading.py",
 line 70, in run
   3/9/2024 5:32:22 PM    return old_run_func(self, *a, **kw)
   3/9/2024 5:32:22 PM           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   3/9/2024 5:32:22 PM  File 
"/opt/pyenv/versions/3.11.7/lib/python3.11/site-packages/airflow/jobs/triggerer_job_runner.py",
 line 461, in run
   3/9/2024 5:32:22 PM    asyncio.run(self.arun())
   3/9/2024 5:32:22 PM  File 
"/opt/pyenv/versions/3.11.7/lib/python3.11/asyncio/runners.py", line 190, in run
   3/9/2024 5:32:22 PM    return runner.run(main)
   3/9/2024 5:32:22 PM           ^^^^^^^^^^^^^^^^
   3/9/2024 5:32:22 PM  File 
"/opt/pyenv/versions/3.11.7/lib/python3.11/asyncio/runners.py", line 118, in run
   3/9/2024 5:32:22 PM    return self._loop.run_until_complete(task)
   3/9/2024 5:32:22 PM           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   3/9/2024 5:32:22 PM  File 
"/opt/pyenv/versions/3.11.7/lib/python3.11/asyncio/base_events.py", line 653, 
in run_until_complete
   3/9/2024 5:32:22 PM    return future.result()
   3/9/2024 5:32:22 PM           ^^^^^^^^^^^^^^^
   3/9/2024 5:32:22 PM  File 
"/opt/pyenv/versions/3.11.7/lib/python3.11/site-packages/airflow/jobs/triggerer_job_runner.py",
 line 475, in arun
   3/9/2024 5:32:22 PM    await self.create_triggers()
   3/9/2024 5:32:22 PM  File 
"/opt/pyenv/versions/3.11.7/lib/python3.11/site-packages/airflow/jobs/triggerer_job_runner.py",
 line 500, in create_triggers
   3/9/2024 5:32:22 PM    dag_id = task_instance.dag_id
   3/9/2024 5:32:22 PM             ^^^^^^^^^^^^^^^^^^^^
   3/9/2024 5:32:22 PMAttributeError: 'NoneType' object has no attribute 
'dag_id'
   ```
   
   This is the same error and behavior that @tomrutter observed in 
https://github.com/apache/airflow/issues/32091
   
   I believe I've identified the root cause as a race condition that may happen 
especially if the `TriggererJobRunner`'s main loop (`_run_trigger_loop‎`) is 
delayed due to high load. For context, our triggerer containers share a host 
with Celery worker containers. It seems there is a small window (which gets 
larger with higher load) where, if you are running multiple triggerer 
instances, one triggerer could clear the `TaskInstance.trigger_id` while 
another is about to start executing it. Consider the following scenario.
   
   1. `TaskInstance` TI1 defers itself, which creates `Trigger` T1, which holds 
a reference to TI1.
   2. T1 gets picked up by `TriggererJobRunner` TJR1 and starts running T1.
   3. TJR1 misses a heartbeat, most likely due to high host load causing delays 
in each `TriggererJobRunner._run_trigger_loop`‎ loop.
   4. A second `TriggererJobRunner` TJR2 notices that T1 has missed its 
heartbeat, so it starts the process of picking up any Triggers that TJR1 may 
have had, including T1.
   5. Before TJR2 starts executing T1, TJR1 finishes execution of T1 and cleans 
it up by clearing the `trigger_id` of TI1.
   6. TJR2 tries to execute T1, but it crashes (with the above error) while 
trying to look up TI1 (because T1 no longer has a TaskInstance linked to it).
   
   When a new `Trigger` is created, it is guaranteed to have a linked 
`TaskInstance` because the creation of the `Trigger` and update to the 
`TaskInstance`'s `trigger_id` field are committed together.
   
https://github.com/apache/airflow/blob/eb24742d5300d2d87b17b4bcd67f639dbafd9818/airflow/models/taskinstance.py#L1414
   The only places I could find where the `TaskInstance`'s `trigger_id` is 
modified are in the `Trigger.submit_event` and `Trigger.submit_failure`.
   
https://github.com/apache/airflow/blob/eb24742d5300d2d87b17b4bcd67f639dbafd9818/airflow/models/trigger.py#L141
   
https://github.com/apache/airflow/blob/eb24742d5300d2d87b17b4bcd67f639dbafd9818/airflow/models/trigger.py#L161
   
   So it seems to me that we can assume that any `Trigger` that does not have 
an associated `TaskInstance` has already been handled correctly by another 
`TriggerJobRunner`. I was able to fix this behavior in our deployment with a 
simple patch to skip `Trigger`s without a `TaskInstance`.
   
   ```python
   diff --git a/airflow/jobs/triggerer_job_runner.py 
b/airflow/jobs/triggerer_job_runner.py
   index c1168a09b1..1f71b2abe7 100644
   --- a/airflow/jobs/triggerer_job_runner.py
   +++ b/airflow/jobs/triggerer_job_runner.py
   @@ -689,6 +689,16 @@ class TriggerRunner(threading.Thread, LoggingMixin):
                    self.failed_triggers.append((new_id, e))
                    continue
    
   +            # If new_trigger_orm.task_instance is None, this means the 
TaskInstance
   +            # row was updated by either Trigger.submit_event or 
Trigger.submit_failure
   +            # and can happen when a single trigger Job is being run on 
multiple TriggerRunners
   +            # in a High-Availability setup.
   +            if new_trigger_orm.task_instance is None:
   +                self.log.warning(
   +                    "TaskInstance for Trigger ID %s is None. Skipping 
trigger instantiation.", new_id
   +                )
   +                continue
   +
                try:
                    new_trigger_instance = 
trigger_class(**new_trigger_orm.kwargs)
                except TypeError as err:
   ```
   
   Happy to submit a PR!
   
   ### What you think should happen instead?
   
   Triggerer containers shouldn't crash due to a race condition.
   
   ### How to reproduce
   
   Happens very sporadically and intermittently, so very difficult to 
reproduce. I've also played around with a toy Airflow deployment locally but 
haven't found a way to reproduce it. Suggestions welcome!
   
   ### Operating System
   
   Ubuntu 22.04.3 LTS
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Other Docker-based deployment
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to