safaehar commented on code in PR #68068:
URL: https://github.com/apache/airflow/pull/68068#discussion_r3394622086
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1314,12 +1319,16 @@ def process_executor_events(
# Return if no finished tasks
if not tis_with_right_state:
- return len(event_buffer)
+ stats.gauge("scheduler.executor_events.batch_size", num_events)
+ stats.incr("scheduler.executor_events.processed", num_events)
+ return num_events
Review Comment:
Good catch — fixed in 79f3197. All three returns (both early returns and the
final one) now `return len(event_buffer)` to preserve the original
remaining-count contract that feeds `num_finished_events`/idle detection.
`num_events = len(event_buffer)` is captured once at the top (before the pops)
and used only for the `batch_size`/`processed` metric emission, so the metric
reports the full batch size while the return value keeps its original meaning.
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -3131,6 +3143,12 @@ def adopt_or_reset_orphaned_tasks(self, *, session:
Session = NEW_SESSION) -> in
stats.incr("scheduler.orphaned_tasks.cleared",
len(to_reset))
stats.incr("scheduler.orphaned_tasks.adopted",
len(tis_to_adopt_or_reset) - len(to_reset))
+ if to_reset:
+ stats.incr(
+ "scheduler.zombies.detected",
+ len(to_reset),
+ tags={"reason": "adopt_failure"},
+ )
if to_reset:
task_instance_str = "\n\t".join(reset_tis_message)
Review Comment:
Done in 79f3197 — merged into a single `if to_reset:` block containing both
the `scheduler.zombies.detected` metric and the log message.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]