kaxil commented on PR #63355:
URL: https://github.com/apache/airflow/pull/63355#issuecomment-4049245064
While you already have them running, could you find these please:
1. Could you upload logs from both schedulers when this happens? (full logs
around the time of the race, not just excerpts)
2. What is your `scheduler_health_check_threshold` setting? And how long do
your tasks typically take to execute?
3. Was Scheduler A actually unhealthy, or was it still running fine when
Scheduler B marked its job as failed?
The `DagRun.get_running_dag_runs_to_examine()` uses `SELECT ... FOR UPDATE
SKIP LOCKED` on DagRun rows. This means two schedulers should NOT be able to
process the same DagRun concurrently - Scheduler B would skip any DagRun locked
by Scheduler A:
```py
def get_running_dag_runs_to_examine(cls, session: Session) ->
ScalarResult[DagRun]:
"""
Return the next DagRuns that the scheduler should attempt to schedule.
This will return zero or more DagRun rows that are row-level-locked with
a "SELECT ... FOR UPDATE"
query, you should ensure that any scheduling decisions are made in a
single transaction -- as soon as
the transaction is committed it will be unlocked.
"""
...
return session.scalars(with_row_locks(query, of=cls, session=session,
skip_locked=True)).unique()
```
Full Code:
https://github.com/apache/airflow/blob/267a566e0488b6d91e3983ddd44fa507daa383db/airflow-core/src/airflow/models/dagrun.py#L581-L616
Given this, the `schedule_tis()` race (where two schedulers both increment
try_number on the same TI) shouldn't be possible under normal operation.
From https://github.com/apache/airflow/issues/57618#issuecomment-3684696222
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]