kaxil commented on code in PR #67750:
URL: https://github.com/apache/airflow/pull/67750#discussion_r3327142448
##########
airflow-core/src/airflow/dag_processing/manager.py:
##########
@@ -1509,18 +1512,23 @@ def _add_files_to_queue(
):
"""Add stuff to the back or front of the file queue, unless it's
already present."""
if mode == "frontprio":
+ # Move (or insert) each file to the front, even if already queued.
Iterating in
+ # order and pushing each to the front reproduces the old
remove()+appendleft() order.
for file in files:
- # Try removing the file if already present
- with contextlib.suppress(ValueError):
- self._file_queue.remove(file)
- # enqueue file to the start of the queue.
- self._file_queue.appendleft(file)
+ self._file_queue[file] = None
Review Comment:
`bundle_path` is `compare=False` on `DagFileInfo`, so
`self._file_queue[file] = None` on an already-queued equal key keeps the
**original** key object and only updates the value -- the old
`deque.remove(file)` + `appendleft(file)` replaced it with the new object. So
an equal-but-fresher info (e.g. a `claim_priority_files` override sourcing
requests from an API, per its docstring) would keep a stale `bundle_path`,
which `_start_new_processes` then parses with.
Not reachable with in-tree bundles today (priority and regular files both
resolve `bundle_path` from the same `self._dag_bundles` object, and
`bundle_version` is part of equality), but this is a behavior-preserving
refactor and this one path isn't identical. A `self._file_queue.pop(file,
None)` before the insert restores the old replace-on-requeue semantics:
```python
for file in files:
self._file_queue.pop(file, None)
self._file_queue[file] = None
self._file_queue.move_to_end(file, last=False)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]