The main cause here is that we reuse our dag runs, when we shouldn't. But resolving that is a big project.
On Fri, Dec 12, 2025 at 4:40 PM Ferruzzi, Dennis <[email protected]> wrote: > I'll amend my proposal a little with a less-intrusive proposal. We can > leave the RUNNING -> RUNNING transition alone entirely, revert my TERMINAL > -> QUEUED transition fix, and make the following change in the > clear_task_instances method [1] instead: > > ``` > > for dr in drs: > if dr.state in State.finished_dr_states: > # ... existing code ... > > # Always update queued_at when clearing, regardless of the old state > if dag_run_state == DagRunState.QUEUED or dr.state == > DagRunState.RUNNING: > dr.queued_at = timezone.utcnow() > dr.clear_number += 1 > ``` > > > That would be super clean and the only effective change would be that > queued_at will always reflect the most recent time it was marked as ready > for scheduling, which I feel was the intent of that column (but I may be > wrong) > > > > [1] > https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/models/taskinstance.py#L192 > > > - ferruzzi > > > ________________________________ > From: Stefan Wang <[email protected]> > Sent: Wednesday, December 10, 2025 9:57 PM > To: [email protected] > Subject: RE: [EXT] DagRun queued_at timestamp discussion > > CAUTION: This email originated from outside of the organization. Do not > click links or open attachments unless you can confirm the sender and know > the content is safe. > > > > AVERTISSEMENT: Ce courrier électronique provient d’un expéditeur externe. > Ne cliquez sur aucun lien et n’ouvrez aucune pièce jointe si vous ne pouvez > pas confirmer l’identité de l’expéditeur et si vous n’êtes pas certain que > le contenu ne présente aucun risque. > > > > Hi Dennis, > > Thanks for the great investigation work and for opening this discussion! > > PR #59066 was a clean fix, and I really appreciated how you dug into the > SQLAlchemy behavior to understand what was happening. Here are my thoughts > (though I'm relatively new to the community and still catching up on > discussions pre-2023, so please take my input as just one perspective) :p > The PR solves the immediate problem well. Updating queued_at when clearing > finished runs makes sense semantically and fixes the Deadline Alert issue. > The explicit assignment is clear and sidesteps the property-setter > persistence issue nicely. > > 1. RUNNING → RERUNNING vs. RUNNING → QUEUED > I can see why this feels unintuitive. My guess is RERUNNING exists to > preserve "this was already executing" information, which might be useful > for monitoring/debugging?, but I don't have context on the original design > decisions here. > This feels like a bigger state machine question—would be interesting to > see thoughts from the community. IIUC, Deadline Alerts should be okay if > this stays as-is for now though. > > 2. Should queued_at update for RERUNNING too? > PR #59066 updated queued_at for finished → QUEUED but not for RUNNING → > RERUNNING. For Deadline Alert consistency, it probably should update in > both cases. The semantic tension is that RERUNNING doesn't actually go > through a "queued" stage, but pragmatically it's "starting the execution > attempt again," so resetting the timer makes sense to me. > > 3. queued_at vs. last_queued_at Column > In a "perfect world" I'd prefer renaming the existing queued_at column to > something like last_queued_at, submitted_at, or last_attempt_at for clearer > semantics. However, I can see how that kind of breaking change would be a > migration headache without strong enough justification. Keeping queued_at > as-is seems pragmatic. > > 4. Queue Time Arrays > Agreed this could be a bit over-engineering at this stage. > > Thanks, > Stefan > > > On Dec 9, 2025, at 2:04 PM, Ferruzzi, Dennis <[email protected]> > wrote: > > > > There's a good bit of context here to get to where I'm going, but the > TLDR is "Should the DagRun's queued_at time get updated when a user clears > the Dag run?" > > > > Initial Question: When a user has a Deadline using the > DeadlineReference.DAGRUN_QUEUED_AT and the Dag run gets cleared, shouldn't > the timer restart? > > Initial Answer: Surely. If my deadline says "email me if this has been > queued for more than a half hour", then I would expect re-queuing the run > should reset that timer. > > > > I was surprised to learn that it doesn't, so I did some digging and > found a few related bits that feel odd to me. I'm looking to start up a > discussion about whether they are intentional and needed or if they can > safely be fixed. Given the code structure, they appear to be intentional > decisions, so I want to make sure I'm not missing context before changing > things. > > > > What's currently happening: > > > > When a user clicks on the "clear dag run" button in the UI, we land in > models/taskinstance.py::clear_task_instance [1]. If the run was in a > terminal state then set the state to QUEUED, if the run was RUNNING then > skip the queue and go straight to RERUNNING. Neither case changes the > queued_at timestamp. [2] > > > > I did some digging and it looks like this goes back to a commit in 2023 > [3] where there was an Issue [4] requesting that start_date should not get > changed (which I agree with) and I suspect that queued_at was just > collateral damage here and this is safe to fix, but I want to see if anyone > knows of a reason this should remain how it is. > > > > Discussion Points: > > > > 1. Why does a finished run get sent to the queue but a RUNNING dag run > skip the queue when it is cleared? That seems unintuitive and I would > expect it to get tossed to the queue for the scheduler to pick back up > regardless of the current state. > > 2. Bikeshed time! IF a change is needed (TBD), then should we update > queued_at or add a new column perhaps last_queued_at to prevent breaking > anyone's workflow? If we're adding a new column, should it be the > last_queued_at, or should it be an array showing every time it was > re-queued? If the latter, do we add an API so users can access that? > > > > My thoughts and proposal: > > > > I feel like any time the run is cleared it should get tossed back on the > queue regardless of the current state, and doing so should update the > existing queued_at column. It's simple, clean, and predictable. If we > decide that is a breaking change then my backup proposal is always re-queue > and add a single-timestamp last_queued_at non-nullable column which > defaults to match queued_at at init and gets updated each time the run is > re-queued. > > > > > > [1] > https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/models/taskinstance.py#L192 > > [2] This is now a lie, I changed the terminal state to update in > https://github.com/apache/airflow/pull/59066 but for now let's leave that > out of the discussion. It may need to be reverted or modified depending on > how this discussion goes. > > [3] https://github.com/apache/airflow/pull/30125 > > [4] https://github.com/apache/airflow/issues/30124 > > > > > > > > > > - ferruzzi > > > --------------------------------------------------------------------- > To unsubscribe, e-mail: [email protected] > For additional commands, e-mail: [email protected] > >
