Lioscro commented on code in PR #38666:
URL: https://github.com/apache/airflow/pull/38666#discussion_r1564458689


##########
tests/jobs/test_triggerer_job.py:
##########
@@ -309,6 +309,140 @@ def test_update_trigger_with_triggerer_argument_change(
         assert "got an unexpected keyword argument 'not_exists_arg'" in 
caplog.text
 
 
+def test_trigger_create_race_condition_38599(session, tmp_path):
+    """
+    This verifies the resolution of race condition documented in github issue 
#38599.
+    More details in the issue description.
+
+    The race condition may occur in the following scenario:
+        1. TaskInstance TI1 defers itself, which creates Trigger T1, which 
holds a
+            reference to TI1.
+        2. T1 gets picked up by TriggererJobRunner TJR1 and starts running T1.
+        3. TJR1 misses a heartbeat, most likely due to high host load causing 
delays in
+            each TriggererJobRunner._run_trigger_loop loop.
+        4. A second TriggererJobRunner TJR2 notices that T1 has missed its 
heartbeat,
+            so it starts the process of picking up any Triggers that TJR1 may 
have had,
+            including T1.
+        5. Before TJR2 starts executing T1, TJR1 finishes execution of T1 and 
cleans it
+            up by clearing the trigger_id of TI1.
+        6. TJR2 tries to execute T1, but it crashes (with the above error) 
while trying to
+            look up TI1 (because T1 no longer has a TaskInstance linked to it).
+    """
+    path = tmp_path / "test_trigger_create_after_completion.txt"
+    trigger = TimeDeltaTrigger_(delta=datetime.timedelta(microseconds=1), 
filename=path.as_posix())
+    trigger_orm = Trigger.from_object(trigger)
+    trigger_orm.id = 1
+    session.add(trigger_orm)
+
+    dag = DagModel(dag_id="test-dag")
+    dag_run = DagRun(dag.dag_id, run_id="abc", run_type="none")
+    ti = TaskInstance(
+        PythonOperator(task_id="dummy-task", python_callable=print),
+        run_id=dag_run.run_id,
+        state=TaskInstanceState.DEFERRED,
+    )
+    ti.dag_id = dag.dag_id
+    ti.trigger_id = 1
+    session.add(dag)
+    session.add(dag_run)
+    session.add(ti)
+
+    job1 = Job()
+    job2 = Job()
+    session.add(job1)
+    session.add(job2)
+
+    session.commit()
+
+    class TriggerRunnerWithCreateCount_(TriggerRunner):
+        async def create_triggers(self):
+            num_triggers_to_create = len(self.to_create)
+            await super().create_triggers()
+            self.trigger_creation_count = getattr(self, 
"trigger_creation_count", 0) + num_triggers_to_create
+
+    class TriggerRunnerWithUpdateDelay_(TriggerRunnerWithCreateCount_):
+        """TriggerRunner with a 5 second delay added at the beginning of 
update_triggers
+        to increase the window that the race condition may occur.
+        """
+
+        def update_triggers(self, *args, **kwargs):
+            # Delay calling update_triggers to increase the window of 
opportunity
+            time.sleep(5)
+            super().update_triggers(*args, **kwargs)
+
+        async def create_triggers(self):
+            await super().create_triggers()
+            self.create_triggers_count = getattr(self, 
"create_triggers_count", 0) + 1
+
+    class TriggererJobRunner_(TriggererJobRunner):
+        """TriggererJobRunner whose handle_events blocks until there is an 
event."""
+
+        def load_triggers(self):
+            super().load_triggers()
+            self.load_triggers_count = getattr(self, "load_triggers_count", 0) 
+ 1
+
+        def handle_events(self):
+            # Wait for event during the first loop
+            while not self.trigger_runner.events and getattr(self, 
"handle_events_count", 0) == 0:
+                time.sleep(0.1)
+            super().handle_events()
+            self.handle_events_count = getattr(self, "handle_events_count", 0) 
+ 1
+            # Prevent Trigger.clean_unused() from deleting the trigger
+            time.sleep(5)

Review Comment:
   Alright, I changed the test to make a slightly different approach, calling 
triggerer functions manually so that the execution timings are more tightly 
controlled. Lmk how it looks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to