Lioscro commented on code in PR #38666: URL: https://github.com/apache/airflow/pull/38666#discussion_r1564458689
########## tests/jobs/test_triggerer_job.py: ########## @@ -309,6 +309,140 @@ def test_update_trigger_with_triggerer_argument_change( assert "got an unexpected keyword argument 'not_exists_arg'" in caplog.text +def test_trigger_create_race_condition_38599(session, tmp_path): + """ + This verifies the resolution of race condition documented in github issue #38599. + More details in the issue description. + + The race condition may occur in the following scenario: + 1. TaskInstance TI1 defers itself, which creates Trigger T1, which holds a + reference to TI1. + 2. T1 gets picked up by TriggererJobRunner TJR1 and starts running T1. + 3. TJR1 misses a heartbeat, most likely due to high host load causing delays in + each TriggererJobRunner._run_trigger_loop loop. + 4. A second TriggererJobRunner TJR2 notices that T1 has missed its heartbeat, + so it starts the process of picking up any Triggers that TJR1 may have had, + including T1. + 5. Before TJR2 starts executing T1, TJR1 finishes execution of T1 and cleans it + up by clearing the trigger_id of TI1. + 6. TJR2 tries to execute T1, but it crashes (with the above error) while trying to + look up TI1 (because T1 no longer has a TaskInstance linked to it). + """ + path = tmp_path / "test_trigger_create_after_completion.txt" + trigger = TimeDeltaTrigger_(delta=datetime.timedelta(microseconds=1), filename=path.as_posix()) + trigger_orm = Trigger.from_object(trigger) + trigger_orm.id = 1 + session.add(trigger_orm) + + dag = DagModel(dag_id="test-dag") + dag_run = DagRun(dag.dag_id, run_id="abc", run_type="none") + ti = TaskInstance( + PythonOperator(task_id="dummy-task", python_callable=print), + run_id=dag_run.run_id, + state=TaskInstanceState.DEFERRED, + ) + ti.dag_id = dag.dag_id + ti.trigger_id = 1 + session.add(dag) + session.add(dag_run) + session.add(ti) + + job1 = Job() + job2 = Job() + session.add(job1) + session.add(job2) + + session.commit() + + class TriggerRunnerWithCreateCount_(TriggerRunner): + async def create_triggers(self): + num_triggers_to_create = len(self.to_create) + await super().create_triggers() + self.trigger_creation_count = getattr(self, "trigger_creation_count", 0) + num_triggers_to_create + + class TriggerRunnerWithUpdateDelay_(TriggerRunnerWithCreateCount_): + """TriggerRunner with a 5 second delay added at the beginning of update_triggers + to increase the window that the race condition may occur. + """ + + def update_triggers(self, *args, **kwargs): + # Delay calling update_triggers to increase the window of opportunity + time.sleep(5) + super().update_triggers(*args, **kwargs) + + async def create_triggers(self): + await super().create_triggers() + self.create_triggers_count = getattr(self, "create_triggers_count", 0) + 1 + + class TriggererJobRunner_(TriggererJobRunner): + """TriggererJobRunner whose handle_events blocks until there is an event.""" + + def load_triggers(self): + super().load_triggers() + self.load_triggers_count = getattr(self, "load_triggers_count", 0) + 1 + + def handle_events(self): + # Wait for event during the first loop + while not self.trigger_runner.events and getattr(self, "handle_events_count", 0) == 0: + time.sleep(0.1) + super().handle_events() + self.handle_events_count = getattr(self, "handle_events_count", 0) + 1 + # Prevent Trigger.clean_unused() from deleting the trigger + time.sleep(5) Review Comment: Alright, I changed the test to make a slightly different approach, calling triggerer functions manually so that the execution timings are more tightly controlled. Lmk how it looks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org