1fanwang commented on code in PR #67043:
URL: https://github.com/apache/airflow/pull/67043#discussion_r3358840514


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1288,7 +1290,29 @@ def process_executor_events(
                 cls.logger().error("Callback %s failed: %s", callback_id, 
callback.output)
             session.add(callback)
 
-        # Return if no finished tasks
+        for key in queued_tis:
+            try_number = ti_primary_key_to_try_number_map[key.primary]
+            if key.try_number != try_number:
+                continue
+            _, info = event_buffer[key]
+            result = cast(
+                "CursorResult",
+                session.execute(
+                    update(TI)
+                    .where(
+                        TI.dag_id == key.dag_id,
+                        TI.task_id == key.task_id,
+                        TI.run_id == key.run_id,
+                        TI.map_index == key.map_index,
+                        TI.try_number == try_number,
+                    )
+                    .values(external_executor_id=info)
+                ),
+            )
+            if result.rowcount:

Review Comment:
   Splitting queued events onto an unlocked UPDATE is the right shape here.
   
   One question on the `try_number` filter plus this `if result.rowcount` 
guard: if a row advanced a try between dispatch and event drain, the UPDATE 
matches nothing and the event is left un-popped, where the old locked path set 
`external_executor_id` and consumed it unconditionally. Intended? If so it 
reads as arguably cleaner (no stale-id write) — just want to confirm the 
un-popped event doesn't skew the `len(event_buffer)` returned below.
   



##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -914,6 +914,69 @@ def 
test_process_executor_events_multiple_try_numbers_warns(
         mock_task_callback.assert_not_called()
         mock_stats.incr.assert_not_called()
 
+    def test_process_executor_events_queued_updates_without_row_lock(self, 
dag_maker, session):
+        dag_id = "test_process_executor_events_queued_updates_without_row_lock"
+        executor_id = "queued_executor_id"
+
+        with dag_maker(dag_id=dag_id, fileloc="/test_path1/"):
+            task = EmptyOperator(task_id="dummy_task")
+        ti = dag_maker.create_dagrun().get_task_instance(task.task_id)
+        ti.state = State.QUEUED
+        session.merge(ti)
+        session.commit()
+
+        executor = MockExecutor(do_update=False)
+        scheduler_job = Job()
+        self.job_runner = SchedulerJobRunner(scheduler_job, 
executors=[executor])
+
+        executor.event_buffer[ti.key] = State.QUEUED, executor_id
+
+        with mock.patch(
+            "airflow.jobs.scheduler_job_runner.with_row_locks", 
wraps=with_row_locks
+        ) as row_locks:
+            self.job_runner._process_executor_events(executor=executor, 
session=session)
+
+        ti.refresh_from_db(session=session)
+        assert ti.external_executor_id == executor_id
+        assert ti.key not in executor.event_buffer
+        row_locks.assert_not_called()

Review Comment:
   Solid regression guard — the lock is provably skipped for queued-only 
batches.
   
   The symptom in the issue was the 1213 deadlock under concurrent MySQL, which 
this can't exercise (sqlite, single-thread). That shape repros cheaply with a 
Docker MySQL 8 + two-thread `FOR UPDATE` script. A before/after on the hot path 
would let a scheduler committer merge without taking the deadlock on faith.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to