Lioscro opened a new issue, #38599: URL: https://github.com/apache/airflow/issues/38599
### Apache Airflow version Other Airflow 2 version (please specify below) ### If "Other Airflow 2 version" selected, which one? 2.6.3 ### What happened? Our Airflow deployment runs 4 triggerer instances. Under high load, we would often see triggerer containers sporadically crashing (and being restarted by our container orchestration system) with the following error. ``` Traceback (most recent call last): 3/9/2024 5:32:22 PM File "/opt/pyenv/versions/3.11.7/lib/python3.11/threading.py", line 1045, in _bootstrap_inner 3/9/2024 5:32:22 PM self.run() 3/9/2024 5:32:22 PM File "/opt/pyenv/versions/3.11.7/lib/python3.11/site-packages/sentry_sdk/integrations/threading.py", line 72, in run 3/9/2024 5:32:22 PM reraise(*_capture_exception()) 3/9/2024 5:32:22 PM File "/opt/pyenv/versions/3.11.7/lib/python3.11/site-packages/sentry_sdk/_compat.py", line 60, in reraise 3/9/2024 5:32:22 PM raise value 3/9/2024 5:32:22 PM File "/opt/pyenv/versions/3.11.7/lib/python3.11/site-packages/sentry_sdk/integrations/threading.py", line 70, in run 3/9/2024 5:32:22 PM return old_run_func(self, *a, **kw) 3/9/2024 5:32:22 PM ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 3/9/2024 5:32:22 PM File "/opt/pyenv/versions/3.11.7/lib/python3.11/site-packages/airflow/jobs/triggerer_job_runner.py", line 461, in run 3/9/2024 5:32:22 PM asyncio.run(self.arun()) 3/9/2024 5:32:22 PM File "/opt/pyenv/versions/3.11.7/lib/python3.11/asyncio/runners.py", line 190, in run 3/9/2024 5:32:22 PM return runner.run(main) 3/9/2024 5:32:22 PM ^^^^^^^^^^^^^^^^ 3/9/2024 5:32:22 PM File "/opt/pyenv/versions/3.11.7/lib/python3.11/asyncio/runners.py", line 118, in run 3/9/2024 5:32:22 PM return self._loop.run_until_complete(task) 3/9/2024 5:32:22 PM ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 3/9/2024 5:32:22 PM File "/opt/pyenv/versions/3.11.7/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete 3/9/2024 5:32:22 PM return future.result() 3/9/2024 5:32:22 PM ^^^^^^^^^^^^^^^ 3/9/2024 5:32:22 PM File "/opt/pyenv/versions/3.11.7/lib/python3.11/site-packages/airflow/jobs/triggerer_job_runner.py", line 475, in arun 3/9/2024 5:32:22 PM await self.create_triggers() 3/9/2024 5:32:22 PM File "/opt/pyenv/versions/3.11.7/lib/python3.11/site-packages/airflow/jobs/triggerer_job_runner.py", line 500, in create_triggers 3/9/2024 5:32:22 PM dag_id = task_instance.dag_id 3/9/2024 5:32:22 PM ^^^^^^^^^^^^^^^^^^^^ 3/9/2024 5:32:22 PMAttributeError: 'NoneType' object has no attribute 'dag_id' ``` This is the same error and behavior that @tomrutter observed in https://github.com/apache/airflow/issues/32091 I believe I've identified the root cause as a race condition that may happen especially if the `TriggererJobRunner`'s main loop (`_run_trigger_loop`) is delayed due to high load. For context, our triggerer containers share a host with Celery worker containers. It seems there is a small window (which gets larger with higher load) where, if you are running multiple triggerer instances, one triggerer could clear the `TaskInstance.trigger_id` while another is about to start executing it. Consider the following scenario. 1. `TaskInstance` TI1 defers itself, which creates `Trigger` T1, which holds a reference to TI1. 2. T1 gets picked up by `TriggererJobRunner` TJR1 and starts running T1. 3. TJR1 misses a heartbeat, most likely due to high host load causing delays in each `TriggererJobRunner._run_trigger_loop` loop. 4. A second `TriggererJobRunner` TJR2 notices that T1 has missed its heartbeat, so it starts the process of picking up any Triggers that TJR1 may have had, including T1. 5. Before TJR2 starts executing T1, TJR1 finishes execution of T1 and cleans it up by clearing the `trigger_id` of TI1. 6. TJR2 tries to execute T1, but it crashes (with the above error) while trying to look up TI1 (because T1 no longer has a TaskInstance linked to it). When a new `Trigger` is created, it is guaranteed to have a linked `TaskInstance` because the creation of the `Trigger` and update to the `TaskInstance`'s `trigger_id` field are committed together. https://github.com/apache/airflow/blob/eb24742d5300d2d87b17b4bcd67f639dbafd9818/airflow/models/taskinstance.py#L1414 The only places I could find where the `TaskInstance`'s `trigger_id` is modified are in the `Trigger.submit_event` and `Trigger.submit_failure`. https://github.com/apache/airflow/blob/eb24742d5300d2d87b17b4bcd67f639dbafd9818/airflow/models/trigger.py#L141 https://github.com/apache/airflow/blob/eb24742d5300d2d87b17b4bcd67f639dbafd9818/airflow/models/trigger.py#L161 So it seems to me that we can assume that any `Trigger` that does not have an associated `TaskInstance` has already been handled correctly by another `TriggerJobRunner`. I was able to fix this behavior in our deployment with a simple patch to skip `Trigger`s without a `TaskInstance`. ```python diff --git a/airflow/jobs/triggerer_job_runner.py b/airflow/jobs/triggerer_job_runner.py index c1168a09b1..1f71b2abe7 100644 --- a/airflow/jobs/triggerer_job_runner.py +++ b/airflow/jobs/triggerer_job_runner.py @@ -689,6 +689,16 @@ class TriggerRunner(threading.Thread, LoggingMixin): self.failed_triggers.append((new_id, e)) continue + # If new_trigger_orm.task_instance is None, this means the TaskInstance + # row was updated by either Trigger.submit_event or Trigger.submit_failure + # and can happen when a single trigger Job is being run on multiple TriggerRunners + # in a High-Availability setup. + if new_trigger_orm.task_instance is None: + self.log.warning( + "TaskInstance for Trigger ID %s is None. Skipping trigger instantiation.", new_id + ) + continue + try: new_trigger_instance = trigger_class(**new_trigger_orm.kwargs) except TypeError as err: ``` Happy to submit a PR! ### What you think should happen instead? Triggerer containers shouldn't crash due to a race condition. ### How to reproduce Happens very sporadically and intermittently, so very difficult to reproduce. I've also played around with a toy Airflow deployment locally but haven't found a way to reproduce it. Suggestions welcome! ### Operating System Ubuntu 22.04.3 LTS ### Versions of Apache Airflow Providers _No response_ ### Deployment Other Docker-based deployment ### Deployment details _No response_ ### Anything else? _No response_ ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org