kaxil commented on code in PR #63489:
URL: https://github.com/apache/airflow/pull/63489#discussion_r2992229990


##########
airflow-core/src/airflow/models/trigger.py:
##########
@@ -286,6 +297,48 @@ def submit_event(cls, trigger_id, event: TriggerEvent, 
session: Session = NEW_SE
         if trigger.callback:
             trigger.callback.handle_event(event, session)
 
+    @classmethod
+    def return_to_worker(cls, task_instance: TaskInstance, session: Session) 
-> None:
+        """
+        Return a task instance to the worker for execution.
+
+        This is used when a trigger fires an event and we need to resume a 
deferred task instance.
+        It is optimized for configured executor types to directly enqueue to a 
worker without going
+        through the scheduler, but can also be used as a general utility to 
return a task instance
+        to the worker for execution.
+        """
+        from airflow.executors import workloads
+        from airflow.executors.executor_loader import ExecutorLoader
+
+        # Remove ourselves as its trigger
+        task_instance.trigger_id = None
+        task_instance.scheduled_dttm = timezone.utcnow()
+
+        if cls.direct_queueing_allowlist:
+            # Resolve team name for multi-team setups
+            team_name: str | None = None
+            if cls.multi_team:
+                team_name = 
task_instance.dag_model.get_team_name(task_instance.dag_id, session=session)
+
+            executor = ExecutorLoader.find_executor(get_executors(), 
task_instance.executor, team_name)
+
+            # Only directly enqueue if the executor is in the configured 
allowlist
+            if executor is not None and executor.name is not None:
+                executor_identifiers = {
+                    executor.name.alias,
+                    executor.name.module_path,
+                    executor.name.module_path.split(".")[-1],
+                }
+                if executor_identifiers & set(cls.direct_queueing_allowlist):
+                    task_instance.state = TaskInstanceState.QUEUED
+                    workload = workloads.ExecuteTask.make(ti=task_instance, 
generator=executor.jwt_generator)
+                    executor.queue_workload(workload, session=session)
+                    executor.trigger_tasks(1)  # Flush all == 1 task to the 
worker immediately

Review Comment:
   `trigger_tasks(1)` sends the task to the Celery broker, but the session has 
not committed yet -- only a `session.flush()` happens later (line 538 in 
`handle_event_submit`). If a Celery worker picks up the message before the 
triggerer's transaction commits, it will read stale state (the TI is not yet 
QUEUED in the DB).
   
   The window is small but measurable under load, especially with the Celery 
`visibility_timeout` defaults.



##########
airflow-core/src/airflow/executors/executor_loader.py:
##########
@@ -311,6 +311,43 @@ def init_executors(cls) -> list[BaseExecutor]:
 
         return loaded_executors
 
+    @classmethod
+    def find_executor(
+        cls,
+        executors: list[BaseExecutor],
+        executor_name: str | None,
+        team_name: str | None,
+    ) -> BaseExecutor | None:
+        """
+        Find the executor matching the given name and team from a list of 
executor instances.
+
+        :param executors: List of available executor instances to search 
through.
+        :param executor_name: The executor name (alias, module path, or class 
name) to find.
+                             Pass None to get the default executor for the 
given team.
+        :param team_name: The team to find the executor for (None for global 
executors).
+        :return: The matching executor instance, or None if not found.
+        """
+        if executor_name is None:
+            if team_name:
+                # Find the default executor for the given team
+                for _executor in executors:
+                    if _executor.team_name == team_name:
+                        return _executor
+            # No team-specific executor found, fall back to global default
+            return executors[0] if executors else None
+
+        # An executor name is specified — search by alias, module path, or 
class name
+        for _executor in executors:
+            if _executor.name and executor_name in (
+                _executor.name.alias,
+                _executor.name.module_path,
+                _executor.name.module_path.split(".")[-1],
+            ):
+                # Must match the team or be a global executor (team_name is 
None)
+                if team_name and _executor.team_name == team_name or 
_executor.team_name is None:

Review Comment:
   Operator precedence bug. This evaluates as:
   ```python
   (team_name and _executor.team_name == team_name) or (_executor.team_name is 
None)
   ```
   
   So when `team_name` is provided, *any* global executor (where `team_name is 
None`) matches regardless of whether its name matches `executor_name`. This 
means a team-specific search for "CeleryExecutor" would also return a global 
"LocalExecutor" if it appears in the list.
   
   Needs parentheses:
   ```python
   if (team_name and _executor.team_name == team_name) or (not team_name and 
_executor.team_name is None):
   ```
   
   Or split into explicit branches for clarity.



##########
airflow-core/src/airflow/models/trigger.py:
##########
@@ -72,6 +72,14 @@ class TriggerFailureReason(str, Enum):
     TRIGGER_FAILURE = "Trigger failure"
 
 
+@cache
+def get_executors():
+    """Load configured executors (cached) and find the one responsible for 
this task instance."""
+    from airflow.executors.executor_loader import ExecutorLoader
+

Review Comment:
   This is a fundamental architecture concern: the triggerer instantiating 
executor instances via `init_executors()` turns it into a mini-scheduler + 
worker dispatcher.
   
   Today, only the scheduler owns executor lifecycle -- creating instances, 
managing state, generating JWT tokens, dispatching to worker queues. This PR 
gives the triggerer all of those responsibilities too:
   
   1. **Executor ownership**: `init_executors()` creates full executor 
instances (connections, JWT generators, queue handlers)
   2. **JWT generation**: `executor.jwt_generator` to sign workloads
   3. **Worker dispatch**: `queue_workload()` + `trigger_tasks(1)` sends 
directly to Celery broker
   
   This conflicts with 
[AIP-92](https://cwiki.apache.org/confluence/display/AIRFLOW/%5BWIP%5D+AIP-92+Isolate+DAG+processor%2C+Callback+processor%2C+and+Triggerer+from+core+services),
 which is moving the triggerer in the opposite direction -- isolating it from 
core services and removing even its direct DB access in favor of API-only 
communication. If AIP-92 wants the triggerer to lose DB access, giving it 
executor ownership and broker dispatch is a step backward.
   
   The triggerer should remain a "client" that reports events back to the 
scheduler (or API server), not become a second scheduler that also dispatches 
work to executors. Could the scheduler instead be made aware that 
deferred-returning tasks should skip re-evaluation and go straight to QUEUED? 
That keeps the dispatch responsibility where it belongs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to