kaxil commented on code in PR #63489:
URL: https://github.com/apache/airflow/pull/63489#discussion_r2992229990
##########
airflow-core/src/airflow/models/trigger.py:
##########
@@ -286,6 +297,48 @@ def submit_event(cls, trigger_id, event: TriggerEvent,
session: Session = NEW_SE
if trigger.callback:
trigger.callback.handle_event(event, session)
+ @classmethod
+ def return_to_worker(cls, task_instance: TaskInstance, session: Session)
-> None:
+ """
+ Return a task instance to the worker for execution.
+
+ This is used when a trigger fires an event and we need to resume a
deferred task instance.
+ It is optimized for configured executor types to directly enqueue to a
worker without going
+ through the scheduler, but can also be used as a general utility to
return a task instance
+ to the worker for execution.
+ """
+ from airflow.executors import workloads
+ from airflow.executors.executor_loader import ExecutorLoader
+
+ # Remove ourselves as its trigger
+ task_instance.trigger_id = None
+ task_instance.scheduled_dttm = timezone.utcnow()
+
+ if cls.direct_queueing_allowlist:
+ # Resolve team name for multi-team setups
+ team_name: str | None = None
+ if cls.multi_team:
+ team_name =
task_instance.dag_model.get_team_name(task_instance.dag_id, session=session)
+
+ executor = ExecutorLoader.find_executor(get_executors(),
task_instance.executor, team_name)
+
+ # Only directly enqueue if the executor is in the configured
allowlist
+ if executor is not None and executor.name is not None:
+ executor_identifiers = {
+ executor.name.alias,
+ executor.name.module_path,
+ executor.name.module_path.split(".")[-1],
+ }
+ if executor_identifiers & set(cls.direct_queueing_allowlist):
+ task_instance.state = TaskInstanceState.QUEUED
+ workload = workloads.ExecuteTask.make(ti=task_instance,
generator=executor.jwt_generator)
+ executor.queue_workload(workload, session=session)
+ executor.trigger_tasks(1) # Flush all == 1 task to the
worker immediately
Review Comment:
`trigger_tasks(1)` sends the task to the Celery broker, but the session has
not committed yet -- only a `session.flush()` happens later (line 538 in
`handle_event_submit`). If a Celery worker picks up the message before the
triggerer's transaction commits, it will read stale state (the TI is not yet
QUEUED in the DB).
The window is small but measurable under load, especially with the Celery
`visibility_timeout` defaults.
##########
airflow-core/src/airflow/executors/executor_loader.py:
##########
@@ -311,6 +311,43 @@ def init_executors(cls) -> list[BaseExecutor]:
return loaded_executors
+ @classmethod
+ def find_executor(
+ cls,
+ executors: list[BaseExecutor],
+ executor_name: str | None,
+ team_name: str | None,
+ ) -> BaseExecutor | None:
+ """
+ Find the executor matching the given name and team from a list of
executor instances.
+
+ :param executors: List of available executor instances to search
through.
+ :param executor_name: The executor name (alias, module path, or class
name) to find.
+ Pass None to get the default executor for the
given team.
+ :param team_name: The team to find the executor for (None for global
executors).
+ :return: The matching executor instance, or None if not found.
+ """
+ if executor_name is None:
+ if team_name:
+ # Find the default executor for the given team
+ for _executor in executors:
+ if _executor.team_name == team_name:
+ return _executor
+ # No team-specific executor found, fall back to global default
+ return executors[0] if executors else None
+
+ # An executor name is specified — search by alias, module path, or
class name
+ for _executor in executors:
+ if _executor.name and executor_name in (
+ _executor.name.alias,
+ _executor.name.module_path,
+ _executor.name.module_path.split(".")[-1],
+ ):
+ # Must match the team or be a global executor (team_name is
None)
+ if team_name and _executor.team_name == team_name or
_executor.team_name is None:
Review Comment:
Operator precedence bug. This evaluates as:
```python
(team_name and _executor.team_name == team_name) or (_executor.team_name is
None)
```
So when `team_name` is provided, *any* global executor (where `team_name is
None`) matches regardless of whether its name matches `executor_name`. This
means a team-specific search for "CeleryExecutor" would also return a global
"LocalExecutor" if it appears in the list.
Needs parentheses:
```python
if (team_name and _executor.team_name == team_name) or (not team_name and
_executor.team_name is None):
```
Or split into explicit branches for clarity.
##########
airflow-core/src/airflow/models/trigger.py:
##########
@@ -72,6 +72,14 @@ class TriggerFailureReason(str, Enum):
TRIGGER_FAILURE = "Trigger failure"
+@cache
+def get_executors():
+ """Load configured executors (cached) and find the one responsible for
this task instance."""
+ from airflow.executors.executor_loader import ExecutorLoader
+
Review Comment:
This is a fundamental architecture concern: the triggerer instantiating
executor instances via `init_executors()` turns it into a mini-scheduler +
worker dispatcher.
Today, only the scheduler owns executor lifecycle -- creating instances,
managing state, generating JWT tokens, dispatching to worker queues. This PR
gives the triggerer all of those responsibilities too:
1. **Executor ownership**: `init_executors()` creates full executor
instances (connections, JWT generators, queue handlers)
2. **JWT generation**: `executor.jwt_generator` to sign workloads
3. **Worker dispatch**: `queue_workload()` + `trigger_tasks(1)` sends
directly to Celery broker
This conflicts with
[AIP-92](https://cwiki.apache.org/confluence/display/AIRFLOW/%5BWIP%5D+AIP-92+Isolate+DAG+processor%2C+Callback+processor%2C+and+Triggerer+from+core+services),
which is moving the triggerer in the opposite direction -- isolating it from
core services and removing even its direct DB access in favor of API-only
communication. If AIP-92 wants the triggerer to lose DB access, giving it
executor ownership and broker dispatch is a step backward.
The triggerer should remain a "client" that reports events back to the
scheduler (or API server), not become a second scheduler that also dispatches
work to executors. Could the scheduler instead be made aware that
deferred-returning tasks should skip re-evaluation and go straight to QUEUED?
That keeps the dispatch responsibility where it belongs.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]