ferruzzi commented on code in PR #61153:
URL: https://github.com/apache/airflow/pull/61153#discussion_r2791464599


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -979,6 +985,70 @@ def _critical_section_enqueue_task_instances(self, 
session: Session) -> int:
 
         return len(queued_tis)
 
+    def _enqueue_executor_callbacks(self, session: Session) -> None:
+        """
+        Enqueue ExecutorCallback workloads to executors.
+
+        Similar to _enqueue_task_instances, but for callbacks that need to run 
on executors.
+        Queries for QUEUED ExecutorCallback instances and routes them to the 
appropriate executor.
+
+        :param session: The database session
+        """
+        num_occupied_slots = sum(executor.slots_occupied for executor in 
self.job.executors)
+        max_callbacks = conf.getint("core", "parallelism") - num_occupied_slots
+
+        if max_callbacks <= 0:
+            self.log.debug("No available slots for callbacks; all executors at 
capacity")
+            return
+
+        queued_callbacks = session.scalars(
+            select(ExecutorCallback)
+            .where(ExecutorCallback.type == CallbackType.EXECUTOR)
+            .where(ExecutorCallback.state == CallbackState.QUEUED)
+            .order_by(ExecutorCallback.priority_weight.desc())
+            .limit(max_callbacks)
+        ).all()
+
+        if not queued_callbacks:
+            return
+
+        # Route callbacks to executors using the generalized routing method
+        executor_to_callbacks = self._executor_to_workloads(queued_callbacks, 
session)
+
+        # Enqueue callbacks for each executor
+        for executor, callbacks in executor_to_callbacks.items():
+            for callback in callbacks:
+                if not isinstance(callback, ExecutorCallback):
+                    # Can't happen since we queried ExecutorCallback, but 
satisfies mypy.
+                    continue
+                dag_run = None
+                if isinstance(callback.data, dict) and "dag_run_id" in 
callback.data:
+                    dag_run_id = callback.data["dag_run_id"]
+                    dag_run = session.get(DagRun, dag_run_id)
+                elif isinstance(callback.data, dict) and "dag_id" in 
callback.data:
+                    # Fallback: try to find the latest dag_run for the dag_id
+                    dag_id = callback.data["dag_id"]
+                    dag_run = session.scalars(
+                        select(DagRun)
+                        .where(DagRun.dag_id == dag_id)
+                        .order_by(DagRun.execution_date.desc())
+                        .limit(1)
+                    ).first()
+
+                if dag_run is None:
+                    self.log.warning("Could not find DagRun for callback %s", 
callback.id)
+                    continue
+
+                workload = workloads.ExecuteCallback.make(
+                    callback=callback,
+                    dag_run=dag_run,
+                    generator=executor.jwt_generator,
+                )
+
+                executor.queue_workload(workload, session=session)
+                callback.state = CallbackState.RUNNING

Review Comment:
   I'm not sure it's behind, but the callbacks are missing a QUEUED state.
   
   PENDING in the Callback.init()
   QUEUED by Deadline.handle_miss
   RUNNING here after we've queued it.... yeah, alright, I see what you 
mean.... 
   
   I'll add a scheduled state and realign these



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to