nailo2c commented on PR #58543:
URL: https://github.com/apache/airflow/pull/58543#issuecomment-4187487283

   > 1. Where exactly is the logic for "2. Assign AssetAndTimeSchedule to the 
non_asset_dags set" added?
   
   First, we need to ensure `AssetAndTimeSchedule` is not in 
`triggered_date_by_dag`.
   ```python
   query, triggered_date_by_dag = DagModel.dags_needing_dagruns(session)
   ```
   
   We added a condition in `dags_needing_dagruns` to remove 
`AssetAndTimeSchedule` from `adrq_by_dag`:
   ```python
   if not isinstance(timetable, AssetTriggeredTimetable):
       del adrq_by_dag[dag_id]
       continue
   
   ...
   
   triggered_date_by_dag: dict[str, datetime] = {
       dag_id: max(adrq.created_at for adrq in adrqs) for dag_id, adrqs in 
adrq_by_dag.items()
   }
   ```
   
   Finally, `AssetAndTimeSchedule` will fall into `non_asset_dags` since we 
know it's not in `asset_triggered_dags`:
   ```python
   all_dags_needing_dag_runs = set(query.all())
   asset_triggered_dags = [d for d in all_dags_needing_dag_runs if d.dag_id in 
triggered_date_by_dag]
   non_asset_dags = {
       d
       # filter asset-triggered Dags
       for d in all_dags_needing_dag_runs.difference(asset_triggered_dags)  # 
this line set `AssetAndTimeSchedule` in `non_asset_dags`
       # filter asset partition triggered Dags
       if d.dag_id not in partition_dag_ids
   }
   ```
   
   (Sorry for not using a permalink in the PR description, so the links are 
outdated and made people confused.)
   
   ---
   
   > 2. Can you explain the purpose of the timeout logic? For example, if a Dag 
is scheduled for 17:00, does it wait until 17:00 + timeout? If the asset events 
do not arrive, the task fails — what happens to the existing ADRQs? Are they 
cleared, or do they count toward the next schedule?
   
   The purpose is to prevent the schedule frequency is higher than the asset 
production rate, causing a backlog of queue runs.
   
   In the original issue's scenario, the schedule and asset production are 1:1, 
so backlog is unlikely, but the timeout still serves as a safety net if the 
upstream simply fails to produce the asset.
   
   > + For example, if a Dag is scheduled for 17:00, does it wait until 17:00 + 
timeout?
   
   Yes, it will wait until 17:00 + timeout.
   
   > + If the asset events do not arrive, the task fails — what happens to the 
existing ADRQs? Are they cleared, or do they count toward the next schedule?
   
   They count toward the next schedule run.
   
   ---
   
   > 3. What will the UI look like in each waiting state? Specifically:
   > + If asset events have arrived but the scheduled time hasn't come yet, 
does the UI indicate we're waiting for the scheduled time?
   
   No, UI will show asset is produced, but Dag won't be create until the 
schedule time.
   
   <img width="1917" height="670" alt="Screenshot 2026-04-03 at 10 39 09 PM" 
src="https://github.com/user-attachments/assets/c0e8f116-0321-4fd4-9a08-83503487897b";
 />
   
   > + If the scheduled time has passed but the asset event hasn't arrived yet, 
does the UI show which assets are still being waited on?
   
   <img width="1910" height="703" alt="Screenshot 2026-04-04 at 10 29 24 AM" 
src="https://github.com/user-attachments/assets/183bf247-e17e-4dfe-93ac-153a8a371c86";
 />
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to