There's a good bit of context here to get to where I'm going, but the TLDR is 
"Should the DagRun's queued_at time get updated when a user clears the Dag run?"

Initial Question:  When a user has a Deadline using the 
DeadlineReference.DAGRUN_QUEUED_AT and the Dag run gets cleared, shouldn't the 
timer restart?
Initial Answer:  Surely.  If my deadline says "email me if this has been queued 
for more than a half hour", then I would expect re-queuing the run should reset 
that timer.

I was surprised to learn that it doesn't, so I did some digging and found a few 
related bits that feel odd to me.  I'm looking to start up a discussion about 
whether they are intentional and needed or if they can safely be fixed. Given 
the code structure, they appear to be intentional decisions, so I want to make 
sure I'm not missing context before changing things.

What's currently happening:

When a user clicks on the "clear dag run" button in the UI, we land in 
models/taskinstance.py::clear_task_instance [1].  If the run was in a terminal 
state then set the state to QUEUED, if the run was RUNNING then skip the queue 
and go straight to RERUNNING.  Neither case changes the queued_at timestamp. [2]

I did some digging and it looks like this goes back to a commit in 2023 [3] 
where there was an Issue [4] requesting that start_date should not get changed 
(which I agree with) and I suspect that queued_at was just collateral damage 
here and this is safe to fix, but I want to see if anyone knows of a reason 
this should remain how it is.

Discussion Points:

1. Why does a finished run get sent to the queue but a RUNNING dag run skip the 
queue when it is cleared?  That seems unintuitive and I would expect it to get 
tossed to the queue for the scheduler to pick back up regardless of the current 
state.
2. Bikeshed time!  IF a change is needed (TBD), then should we update queued_at 
or add a new column perhaps last_queued_at to prevent breaking anyone's 
workflow?   If we're adding a new column, should it be the last_queued_at, or 
should it be an array showing every time it was re-queued?  If the latter, do 
we add an API so users can access that?

My thoughts and proposal:

I feel like any time the run is cleared it should get tossed back on the queue 
regardless of the current state, and doing so should update the existing 
queued_at column.  It's simple, clean, and predictable.  If we decide that is a 
breaking change then my backup proposal is always re-queue and add a 
single-timestamp last_queued_at non-nullable column which defaults to match 
queued_at at init and gets updated each time the run is re-queued.


[1] 
https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/models/taskinstance.py#L192
[2] This is now a lie, I changed the terminal state to update in 
https://github.com/apache/airflow/pull/59066 but for now let's leave that out 
of the discussion.  It may need to be reverted or modified depending on how 
this discussion goes.
[3] https://github.com/apache/airflow/pull/30125
[4] https://github.com/apache/airflow/issues/30124




 - ferruzzi

Reply via email to