uranusjr commented on code in PR #38666:
URL: https://github.com/apache/airflow/pull/38666#discussion_r1579075206


##########
tests/jobs/test_triggerer_job.py:
##########
@@ -309,6 +309,88 @@ def test_update_trigger_with_triggerer_argument_change(
         assert "got an unexpected keyword argument 'not_exists_arg'" in 
caplog.text
 
 
+def test_trigger_create_race_condition_38599(session, tmp_path):
+    """
+    This verifies the resolution of race condition documented in github issue 
#38599.
+    More details in the issue description.
+
+    The race condition may occur in the following scenario:
+        1. TaskInstance TI1 defers itself, which creates Trigger T1, which 
holds a
+            reference to TI1.
+        2. T1 gets picked up by TriggererJobRunner TJR1 and starts running T1.
+        3. TJR1 misses a heartbeat, most likely due to high host load causing 
delays in
+            each TriggererJobRunner._run_trigger_loop loop.
+        4. A second TriggererJobRunner TJR2 notices that T1 has missed its 
heartbeat,
+            so it starts the process of picking up any Triggers that TJR1 may 
have had,
+            including T1.
+        5. Before TJR2 starts executing T1, TJR1 finishes execution of T1 and 
cleans it
+            up by clearing the trigger_id of TI1.
+        6. TJR2 tries to execute T1, but it crashes (with the above error) 
while trying to
+            look up TI1 (because T1 no longer has a TaskInstance linked to it).
+    """
+    path = tmp_path / "test_trigger_create_after_completion.txt"
+    trigger = TimeDeltaTrigger_(delta=datetime.timedelta(microseconds=1), 
filename=path.as_posix())
+    trigger_orm = Trigger.from_object(trigger)
+    trigger_orm.id = 1
+    session.add(trigger_orm)
+
+    dag = DagModel(dag_id="test-dag")
+    dag_run = DagRun(dag.dag_id, run_id="abc", run_type="none")
+    ti = TaskInstance(
+        PythonOperator(task_id="dummy-task", python_callable=print),
+        run_id=dag_run.run_id,
+        state=TaskInstanceState.DEFERRED,
+    )
+    ti.dag_id = dag.dag_id
+    ti.trigger_id = 1
+    session.add(dag)
+    session.add(dag_run)
+    session.add(ti)
+
+    job1 = Job()
+    job2 = Job()
+    session.add(job1)
+    session.add(job2)
+
+    session.commit()
+
+    job_runner1 = TriggererJobRunner(job1)
+    job_runner2 = TriggererJobRunner(job2)
+
+    # Assign and run the trigger on the first TriggererJobRunner
+    # Instead of running job_runner1._execute, we will run the individual 
methods
+    # to control the timing of the execution.
+    job_runner1.load_triggers()
+    assert len(job_runner1.trigger_runner.to_create) == 1
+    # Before calling job_runner1.handle_events, run the trigger synchronously
+    loop = asyncio.get_event_loop()
+    loop.run_until_complete(job_runner1.trigger_runner.create_triggers())
+    assert len(job_runner1.trigger_runner.triggers) == 1
+    trigger_task_key = list(job_runner1.trigger_runner.triggers.keys())[0]
+    trigger_task_info = job_runner1.trigger_runner.triggers[trigger_task_key]
+    loop.run_until_complete(trigger_task_info["task"])
+    assert trigger_task_info["task"].done()
+
+    # In a real execution environment, a missed heartbeat would cause the 
trigger to be picked up
+    # by another TriggererJobRunner.
+    # In this test, however, this is not necessary because we are controlling 
the execution
+    # of the TriggererJobRunner.
+    # job1.latest_heartbeat = timezone.utcnow() - datetime.timedelta(hours=1)
+    # session.commit()
+
+    # This calls Trigger.submit_event, which will unlink the trigger from the 
task instance
+    job_runner1.handle_events()
+
+    # Simulate the second TriggererJobRunner picking up the trigger
+    job_runner2.trigger_runner.update_triggers({trigger_orm.id})
+    # The race condition happens here.
+    # AttributeError: 'NoneType' object has no attribute 'dag_id'
+    loop.run_until_complete(job_runner2.trigger_runner.create_triggers())
+
+    instances = path.read_text().splitlines()
+    assert instances == ["hi"]

Review Comment:
   The `splitlines` part seems unnecessary. Why not just check the content 
directly as a string?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to