nailo2c commented on PR #58543:
URL: https://github.com/apache/airflow/pull/58543#issuecomment-4187487283
> 1. Where exactly is the logic for "2. Assign AssetAndTimeSchedule to the
non_asset_dags set" added?
First, we need to ensure `AssetAndTimeSchedule` is not in
`triggered_date_by_dag`.
```python
query, triggered_date_by_dag = DagModel.dags_needing_dagruns(session)
```
We added a condition in `dags_needing_dagruns` to remove
`AssetAndTimeSchedule` from `adrq_by_dag`:
```python
if not isinstance(timetable, AssetTriggeredTimetable):
del adrq_by_dag[dag_id]
continue
...
triggered_date_by_dag: dict[str, datetime] = {
dag_id: max(adrq.created_at for adrq in adrqs) for dag_id, adrqs in
adrq_by_dag.items()
}
```
Finally, `AssetAndTimeSchedule` will fall into `non_asset_dags` since we
know it's not in `asset_triggered_dags`:
```python
all_dags_needing_dag_runs = set(query.all())
asset_triggered_dags = [d for d in all_dags_needing_dag_runs if d.dag_id in
triggered_date_by_dag]
non_asset_dags = {
d
# filter asset-triggered Dags
for d in all_dags_needing_dag_runs.difference(asset_triggered_dags) #
this line set `AssetAndTimeSchedule` in `non_asset_dags`
# filter asset partition triggered Dags
if d.dag_id not in partition_dag_ids
}
```
(Sorry for not using a permalink in the PR description, so the links are
outdated and made people confused.)
---
> 2. Can you explain the purpose of the timeout logic? For example, if a Dag
is scheduled for 17:00, does it wait until 17:00 + timeout? If the asset events
do not arrive, the task fails — what happens to the existing ADRQs? Are they
cleared, or do they count toward the next schedule?
The purpose is to prevent the schedule frequency is higher than the asset
production rate, causing a backlog of queue runs.
In the original issue's scenario, the schedule and asset production are 1:1,
so backlog is unlikely, but the timeout still serves as a safety net if the
upstream simply fails to produce the asset.
> + For example, if a Dag is scheduled for 17:00, does it wait until 17:00 +
timeout?
Yes, it will wait until 17:00 + timeout.
> + If the asset events do not arrive, the task fails — what happens to the
existing ADRQs? Are they cleared, or do they count toward the next schedule?
They count toward the next schedule run.
---
> 3. What will the UI look like in each waiting state? Specifically:
> + If asset events have arrived but the scheduled time hasn't come yet,
does the UI indicate we're waiting for the scheduled time?
No, UI will show asset is produced, but Dag won't be create until the
schedule time.
<img width="1917" height="670" alt="Screenshot 2026-04-03 at 10 39 09 PM"
src="https://github.com/user-attachments/assets/c0e8f116-0321-4fd4-9a08-83503487897b"
/>
> + If the scheduled time has passed but the asset event hasn't arrived yet,
does the UI show which assets are still being waited on?
<img width="1910" height="703" alt="Screenshot 2026-04-04 at 10 29 24 AM"
src="https://github.com/user-attachments/assets/183bf247-e17e-4dfe-93ac-153a8a371c86"
/>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]