hussein-awala commented on code in PR #68068:
URL: https://github.com/apache/airflow/pull/68068#discussion_r3394347032
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -3131,6 +3143,12 @@ def adopt_or_reset_orphaned_tasks(self, *, session:
Session = NEW_SESSION) -> in
stats.incr("scheduler.orphaned_tasks.cleared",
len(to_reset))
stats.incr("scheduler.orphaned_tasks.adopted",
len(tis_to_adopt_or_reset) - len(to_reset))
+ if to_reset:
+ stats.incr(
+ "scheduler.zombies.detected",
+ len(to_reset),
+ tags={"reason": "adopt_failure"},
+ )
if to_reset:
task_instance_str = "\n\t".join(reset_tis_message)
Review Comment:
these can be merged in a single block
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1314,12 +1319,16 @@ def process_executor_events(
# Return if no finished tasks
if not tis_with_right_state:
- return len(event_buffer)
+ stats.gauge("scheduler.executor_events.batch_size", num_events)
+ stats.incr("scheduler.executor_events.processed", num_events)
+ return num_events
Review Comment:
This changes the behavior of the existing method: `event_buffer.pop(...)`
shrinks the buffer as events are processed, so the original return
`len(event_buffer)` returns the remaining count, not the total. That value
feeds num_finished_events and the scheduler's idle detection, so let's keep the
behavior unchanged here, it might be a pre-existing bug, but I'll deep-dive and
fix it in a separate PR if needed.
Keep num_events for the metric emission, but return `len(event_buffer)` in
all three return statements (the two early returns and the final one).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]