Re: [PR] Add max_trigger_to_select_per_loop conf to respect Triggerer HA setup [airflow]

2025-12-08 Thread via GitHub


potiuk merged PR #58803:
URL: https://github.com/apache/airflow/pull/58803


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Add max_trigger_to_select_per_loop conf to respect Triggerer HA setup [airflow]

2025-12-08 Thread via GitHub


potiuk commented on PR #58803:
URL: https://github.com/apache/airflow/pull/58803#issuecomment-3629623955

   Very nice!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Add max_trigger_to_select_per_loop conf to respect Triggerer HA setup [airflow]

2025-12-07 Thread via GitHub


jscheffl commented on code in PR #58803:
URL: https://github.com/apache/airflow/pull/58803#discussion_r2596442686


##
airflow-core/src/airflow/models/trigger.py:
##
@@ -112,6 +113,8 @@ class Trigger(Base):
 
 callback = relationship("Callback", back_populates="trigger", 
uselist=False)
 
+max_trigger_to_select_per_loop = conf.getint("triggerer", 
"max_trigger_to_select_per_loop", fallback=10)

Review Comment:
   plus here?
   ```suggestion
   max_trigger_to_select_per_loop = conf.getint("triggerer", 
"max_trigger_to_select_per_loop", fallback=50)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Add max_trigger_to_select_per_loop conf to respect Triggerer HA setup [airflow]

2025-12-07 Thread via GitHub


jscheffl commented on code in PR #58803:
URL: https://github.com/apache/airflow/pull/58803#discussion_r2596442508


##
airflow-core/src/airflow/config_templates/config.yml:
##
@@ -2324,6 +2324,15 @@ triggerer:
   type: float
   example: ~
   default: "30"
+max_trigger_to_select_per_loop:
+  description: |
+Maximum number of triggers to select per loop. Set this notably lower 
than ``[triggerer] capacity``
+to keep load balanced across triggerers in HA deployments.
+Benchmarks show that two triggerers can still claim about 1,000 
triggers within one second by default.
+  version_added: 3.2.0
+  type: integer
+  example: ~
+  default: "10"

Review Comment:
   Following your benchmark then 50 per default?
   ```suggestion
 default: "50"
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Add max_trigger_to_select_per_loop conf to respect Triggerer HA setup [airflow]

2025-12-07 Thread via GitHub


jason810496 commented on code in PR #58803:
URL: https://github.com/apache/airflow/pull/58803#discussion_r2596425358


##
airflow-core/src/airflow/config_templates/config.yml:
##
@@ -2324,6 +2324,15 @@ triggerer:
   type: float
   example: ~
   default: "30"
+max_trigger_to_select_per_loop:
+  description: |
+Maximum number of triggers to select per loop. Set this notably lower 
than ``[triggerer] capacity``
+to keep load balanced across triggerers in HA deployments.
+Benchmarks show that two triggerers can still claim about 1,000 
triggers within one second by default.
+  version_added: 3.2.0
+  type: integer
+  example: ~
+  default: "10"

Review Comment:
   Yes, I actually get the default value from `max_dagruns_to_create_per_loop`, 
which also default as `10`.
   I use another session (minimize the logging overhead in Triggerer) to run 
query to get triggers count group by Triggerer(`Job.id`).
   
   The change will only impact the 
[load_triggers](https://github.com/apache/airflow/blob/337aee8/airflow-core/src/airflow/jobs/triggerer_job_runner.py#L527)
 call in `TriggerSupervisor.run` not in the `TriggerRunner.arun`, and each loop 
_should_ take less than 1sec (between `0.1` to `1.01` seconds on my laptop).
   
   
   In my previous setup, I will switch between two terminal and try to run the 
Triggerer in the same time (due to manual setup delay for about 2 or 3 seconds 
between actual Triggerers run), I found that we should set to `10` to make to 
workload balanced enough as screenshot (480:520), or the workloads will be skew 
(about 3:7).
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Add max_trigger_to_select_per_loop conf to respect Triggerer HA setup [airflow]

2025-12-06 Thread via GitHub


jscheffl commented on code in PR #58803:
URL: https://github.com/apache/airflow/pull/58803#discussion_r2595190936


##
airflow-core/src/airflow/config_templates/config.yml:
##
@@ -2324,6 +2324,15 @@ triggerer:
   type: float
   example: ~
   default: "30"
+max_trigger_to_select_per_loop:
+  description: |
+Maximum number of triggers to select per loop. Set this notably lower 
than ``[triggerer] capacity``
+to keep load balanced across triggerers in HA deployments.
+Benchmarks show that two triggerers can still claim about 1,000 
triggers within one second by default.
+  version_added: 3.2.0
+  type: integer
+  example: ~
+  default: "10"

Review Comment:
   How do you get to 10 as default being a good value? This would mean the 
triggerer need to loop 100 times to get 1000 jobs or 5 triggerers each loop 20 
times to pick-up 1000 jobs?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Add max_trigger_to_select_per_loop conf to respect Triggerer HA setup [airflow]

2025-11-27 Thread via GitHub


jason810496 closed pull request #58803: Add max_trigger_to_select_per_loop conf 
to respect Triggerer HA setup
URL: https://github.com/apache/airflow/pull/58803


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



[PR] Add max_trigger_to_select_per_loop conf to respect Triggerer HA setup [airflow]

2025-11-27 Thread via GitHub


jason810496 opened a new pull request, #58803:
URL: https://github.com/apache/airflow/pull/58803

   ## Why
   
   I found that for Triggerer HA setup, one Triggerer will pick up all the jobs 
and another Triggerer will be starving because we limit `remaining_capacity` to 
`[triggerer] capacity` instead of a smaller limit to let another Triggerer to 
have chance to pick up jobs in the same time.
   
   
   
   Steps to reproduce
   
   Before reproducing, we have to change `get_hostname` temporarily to return 
`return str(os.getpid())` as we will always get some hostname with `breeze` 
setup on same machine, just to distingush different Triggerer.
   
   
https://github.com/apache/airflow/blob/6798aa79586c37e7f7ca8068bb7f369ed9580926/airflow-core/src/airflow/utils/net.py#L52-L56
   
   1. Start Airflow with one Triggerer (normal `breeze start-airflow`) and run 
the following Dag that produced 1000 mapped deferable tasks.
   2. Wait for all mapped tasks are in `deferred` state.
   3. Stop the Triggerer and wait for 30 seconds (wait for `[triggerer] 
job_heartbeat_sec` timeout).
   4. Start two Triggerer in different terminal at the same time. (Open 
different terminal then`breeze exec` and run `airflow triggerer` )
   5. We can found that only 1 Triggerer pick up all the jobs and another 
Triggerer is starving.
   
   ```python
   from typing import Any
   
   from datetime import timedelta
   from airflow.providers.common.compat.sdk import BaseSensorOperator
   from airflow.sdk import DAG, task, Context
   from airflow.providers.standard.triggers.temporal import TimeDeltaTrigger
   
   class WaitHoursSensor(BaseSensorOperator):
   def __init__(
   self,
   task_id: str,
   trigger_kwargs,
   **kwargs: dict[str, Any],
   ) -> None:
   super().__init__(task_id=task_id, **kwargs)
   self.trigger_kwargs = trigger_kwargs
   
   def execute(self, context: Context) -> None:
   self.defer(
   
trigger=TimeDeltaTrigger(timedelta(hours=self.trigger_kwargs["delta"])),
   method_name="execute_complete",
   )
   
   def execute_complete(
   self,
   context: Context,
   event: dict[str, Any] | None = None,
   ) -> None:
   print("Wait complete.")
   return
   
   
   with DAG(dag_id="ha_triggerer", schedule=None):
   
   @task
   def items_to_process():
   return [{"delta": i+1} for i in range(1000)]
   
   WaitHoursSensor.partial(task_id="deferable").expand(
   trigger_kwargs=items_to_process(),
   )
   ```
   
   
   
   https://github.com/user-attachments/assets/7b4c2cb7-e81c-469c-8f98-ec84f4aeff94";
 />
   
   - **Before Fix**: One picked up `1000` triggers and another picked `0` 
trigger -> Starving!
   - **After Fix**: One picked up `480` triggers and another picked `520` 
triggers.
   
   
   
   ## What
   
   - Introduce `[triggerer] max_trigger_to_select_per_loop` config to give the 
Triggerer(s) chance to pick up triggers in order to make the workloads more 
balance in HA setup.
   - Even though the `max_trigger_to_select_per_loop` is not an exact accurate 
term as we will use the same limit to select `Callback`,  `TaskInstance` and 
`Asset` triggers in 
[get_sorted_triggers](https://github.com/apache/airflow/blob/6798aa79586c37e7f7ca8068bb7f369ed9580926/airflow-core/src/airflow/models/trigger.py#L365-L366)
 loop.
 - But the main loop of Triggerer is in 
[load_triggers](https://github.com/apache/airflow/blob/6798aa79586c37e7f7ca8068bb7f369ed9580926/airflow-core/src/airflow/jobs/triggerer_job_runner.py#L527-L528).
 - Which mean we will actually pick `type of triggers(Callback, TI andAsset 
= 3 for now) * max_trigger_to_select_per_loop(defaut by 10)` in main loop of 
Triggerer. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]