This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d52cb341444 Refactor dag processor for code clarity (#46443)
d52cb341444 is described below

commit d52cb341444f0b62752a2d5404f375dbc73f2792
Author: Daniel Standish <[email protected]>
AuthorDate: Fri Feb 7 11:18:29 2025 -0800

    Refactor dag processor for code clarity (#46443)
    
    Dag processor has experienced a lot of organic growth over the years and 
had gotten into a state where it became a bit hard to understand. E.g. method 
set_files did not have a name that clearly indicated its purpose. And that's 
probably in part because it did a bunch of things and was called in a bunch of 
places. I simplify that method and focus on just one aspect of its prior work, 
namely handling removed files, and call it handle_removed_files.
    
    A lot of the behavior was driven by a data structure on the instance called 
file paths. This contained all known files. It's one thing that was modified in 
set_files. From looking at the main loop it was not obvious where this 
structure was being used or modified. So I made it a local variable instead of 
instance attr. And now we can easily see all the methods that are using it 
because it must be passed around.
    
    I rename the file_paths to known_files because that is clearer about its 
meaning. Previously it was file_paths and file_queue -- harder to understand 
the different purposes. I make file_paths a dictionary because then it's easier 
to replace all the files in the bundle, something that was previously done by 
iterating through the files.
    
    In prepare file paths, I pull out the mtime mode logic into its own method 
because it's quite involved and made the prepare file paths method too big and 
complicated. Along with this I simplify the logic to not exclude recently 
processed files if they were recently changed.
    
    In some of the tests, I had to change the way we simulated mtime numbers 
because the input is now a set which does not guarantee order. So I encode the 
mtime in the filename in the test. And the one test dealing with zip file, this 
was apparently a flakey test. I change it so we don't mock anything but just 
copy the file to a tmp dir and make a bundle there, then remove it and see what 
happens.
    
    In clear_orphaned_import_errors I no longer pass the entire list of known 
dag files. Cus there could be a lot of them.
---
 airflow/dag_processing/manager.py    | 309 ++++++++++++++++++-----------------
 airflow/models/dag.py                |  21 ++-
 tests/dag_processing/test_manager.py | 218 ++++++++++++++----------
 tests/models/test_dag.py             |   5 +-
 4 files changed, 310 insertions(+), 243 deletions(-)

diff --git a/airflow/dag_processing/manager.py 
b/airflow/dag_processing/manager.py
index 3fb5ac3106d..f720cabbd04 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -31,7 +31,7 @@ import sys
 import time
 import zipfile
 from collections import defaultdict, deque
-from collections.abc import Callable, Iterator
+from collections.abc import Callable, Iterable, Iterator
 from dataclasses import dataclass, field
 from datetime import datetime, timedelta
 from importlib import import_module
@@ -41,7 +41,8 @@ from typing import TYPE_CHECKING, Any, NamedTuple, cast
 
 import attrs
 import structlog
-from sqlalchemy import delete, select, tuple_, update
+from sqlalchemy import select, update
+from sqlalchemy.orm import load_only
 from tabulate import tabulate
 from uuid6 import uuid7
 
@@ -174,14 +175,13 @@ class DagFileProcessorManager(LoggingMixin):
     heartbeat: Callable[[], None] = attrs.field(default=lambda: None)
     """An overridable heartbeat called once every time around the loop"""
 
-    _files: list[DagFileInfo] = attrs.field(factory=list, init=False)
     _file_queue: deque[DagFileInfo] = attrs.field(factory=deque, init=False)
     _file_stats: dict[DagFileInfo, DagFileStat] = attrs.field(
         factory=lambda: defaultdict(DagFileStat), init=False
     )
 
     _dag_bundles: list[BaseDagBundle] = attrs.field(factory=list, init=False)
-    _bundle_versions: dict[str, str] = attrs.field(factory=dict, init=False)
+    _bundle_versions: dict[str, str | None] = attrs.field(factory=dict, 
init=False)
 
     _processors: dict[DagFileInfo, DagFileProcessorProcess] = 
attrs.field(factory=dict, init=False)
 
@@ -296,6 +296,8 @@ class DagFileProcessorManager(LoggingMixin):
 
         poll_time = 0.0
 
+        known_files: dict[str, set[DagFileInfo]] = {}
+
         while True:
             loop_start_time = time.monotonic()
 
@@ -303,17 +305,17 @@ class DagFileProcessorManager(LoggingMixin):
 
             self._kill_timed_out_processors()
 
-            self._refresh_dag_bundles()
+            self._refresh_dag_bundles(known_files=known_files)
 
             if not self._file_queue:
                 # Generate more file paths to process if we processed all the 
files already. Note for this to
                 # clear down, we must have cleared all files found from 
scanning the dags dir _and_ have
                 # cleared all files added as a result of callbacks
-                self.prepare_file_queue()
+                self.prepare_file_queue(known_files=known_files)
                 self.emit_metrics()
             else:
                 # if new files found in dag dir, add them
-                self.add_files_to_queue()
+                self.add_files_to_queue(known_files=known_files)
 
             self._refresh_requested_filelocs()
 
@@ -331,7 +333,7 @@ class DagFileProcessorManager(LoggingMixin):
             # Update number of loop iteration.
             self._num_run += 1
 
-            self._print_stat()
+            self.print_stats(known_files=known_files)
 
             if self.max_runs_reached():
                 self.log.info(
@@ -432,7 +434,7 @@ class DagFileProcessorManager(LoggingMixin):
             session.delete(request)
         return filelocs
 
-    def _refresh_dag_bundles(self):
+    def _refresh_dag_bundles(self, known_files: dict[str, set[DagFileInfo]]):
         """Refresh DAG bundles, if required."""
         now = timezone.utcnow()
 
@@ -504,22 +506,19 @@ class DagFileProcessorManager(LoggingMixin):
 
             self._bundle_versions[bundle.name] = version_after_refresh
 
-            found_files = [
+            found_files = {
                 DagFileInfo(rel_path=p, bundle_name=bundle.name, 
bundle_path=bundle.path)
                 for p in self._find_files_in_bundle(bundle)
-            ]
+            }
 
-            # Now that we have the files present in the latest bundle,
-            # we need to update file_paths to include any new files
-            # and remove any files that are no longer in the bundle.
-            # We do this by removing all existing files that are in this bundle
-            # and then adding all the current files back in.
-            new_files = [f for f in self._files if f.bundle_name != 
bundle.name]
-            new_files.extend(found_files)
-            self.set_files(new_files)
+            known_files[bundle.name] = found_files
+            self.handle_removed_files(known_files=known_files)
 
-            self.deactivate_deleted_dags(active_files=found_files)
-            self.clear_nonexistent_import_errors()
+            self.deactivate_deleted_dags(bundle_name=bundle.name, 
present=found_files)
+            self.clear_orphaned_import_errors(
+                bundle_name=bundle.name,
+                observed_filelocs={str(x.absolute_path) for x in found_files}, 
 # todo: make relative
+            )
 
     def _find_files_in_bundle(self, bundle: BaseDagBundle) -> list[Path]:
         """Get relative paths for dag files from bundle dir."""
@@ -530,8 +529,8 @@ class DagFileProcessorManager(LoggingMixin):
 
         return rel_paths
 
-    def deactivate_deleted_dags(self, active_files: list[DagFileInfo]) -> None:
-        """Deactivate DAGs that come from files that are no longer present."""
+    def deactivate_deleted_dags(self, bundle_name: str, present: 
set[DagFileInfo]) -> None:
+        """Deactivate DAGs that come from files that are no longer present in 
bundle."""
 
         def find_zipped_dags(abs_path: os.PathLike) -> Iterator[str]:
             """
@@ -549,37 +548,31 @@ class DagFileProcessorManager(LoggingMixin):
             except zipfile.BadZipFile:
                 self.log.exception("There was an error accessing ZIP file %s 
%s", abs_path)
 
-        present: set[tuple[str, str]] = set()
-        """
-        Tuple containing bundle name and relative fileloc of the dag file.
-
-        If the dag file is embedded in a zip file, the relative fileloc will 
be the
-        zip file path (relative to bundle path) joined with the path to the 
dag file (relative
-        to the zip file path).
-        """
-
-        for info in active_files:
+        rel_filelocs: list[str] = []
+        for info in present:
             abs_path = str(info.absolute_path)
             if abs_path.endswith(".py") or not zipfile.is_zipfile(abs_path):
-                present.add((info.bundle_name, str(info.rel_path)))
+                rel_filelocs.append(str(info.rel_path))
             else:
                 if TYPE_CHECKING:
                     assert info.bundle_path
                 for abs_sub_path in 
find_zipped_dags(abs_path=info.absolute_path):
                     rel_sub_path = 
Path(abs_sub_path).relative_to(info.bundle_path)
-                    present.add((info.bundle_name, str(rel_sub_path)))
+                    rel_filelocs.append(str(rel_sub_path))
 
-        DagModel.deactivate_deleted_dags(present)
+        DagModel.deactivate_deleted_dags(bundle_name=bundle_name, 
rel_filelocs=rel_filelocs)
 
-    def _print_stat(self):
+    def print_stats(self, known_files: dict[str, set[DagFileInfo]]):
         """Occasionally print out stats about how fast the files are getting 
processed."""
         if 0 < self.print_stats_interval < time.monotonic() - 
self.last_stat_print_time:
-            if self._files:
-                self._log_file_processing_stats(self._files)
+            if known_files:
+                self._log_file_processing_stats(known_files=known_files)
             self.last_stat_print_time = time.monotonic()
 
     @provide_session
-    def clear_nonexistent_import_errors(self, session=NEW_SESSION):
+    def clear_orphaned_import_errors(
+        self, bundle_name: str, observed_filelocs: set[str], session: Session 
= NEW_SESSION
+    ):
         """
         Clear import errors for files that no longer exist.
 
@@ -587,22 +580,18 @@ class DagFileProcessorManager(LoggingMixin):
         """
         self.log.debug("Removing old import errors")
         try:
-            query = delete(ParseImportError)
-
-            if self._files:
-                query = query.where(
-                    tuple_(ParseImportError.filename, 
ParseImportError.bundle_name).notin_(
-                        # todo AIP-66: ParseImportError should have rel 
fileloce + bundle name
-                        [(str(f.absolute_path), f.bundle_name) for f in 
self._files]
-                    ),
-                )
-
-            
session.execute(query.execution_options(synchronize_session="fetch"))
-            session.commit()
+            errors = session.scalars(
+                select(ParseImportError)
+                .where(ParseImportError.bundle_name == bundle_name)
+                .options(load_only(ParseImportError.filename))
+            )
+            for error in errors:
+                if error.filename not in observed_filelocs:
+                    session.delete(error)
         except Exception:
             self.log.exception("Error removing old import errors")
 
-    def _log_file_processing_stats(self, known_files):
+    def _log_file_processing_stats(self, known_files: dict[str, 
set[DagFileInfo]]):
         """
         Print out stats about how files are getting processed.
 
@@ -634,32 +623,34 @@ class DagFileProcessorManager(LoggingMixin):
         rows = []
         utcnow = timezone.utcnow()
         now = time.monotonic()
-        for file in known_files:
-            stat = self._file_stats[file]
-            proc = self._processors.get(file)
-            num_dags = stat.num_dags
-            num_errors = stat.import_errors
-            file_name = Path(file.rel_path).stem
-            processor_pid = proc.pid if proc else None
-            processor_start_time = proc.start_time if proc else None
-            runtime = (now - processor_start_time) if processor_start_time 
else None
-            last_run = stat.last_finish_time
-            if last_run:
-                seconds_ago = (utcnow - last_run).total_seconds()
-                
Stats.gauge(f"dag_processing.last_run.seconds_ago.{file_name}", seconds_ago)
-
-            rows.append(
-                (
-                    file.bundle_name,
-                    file.rel_path,
-                    processor_pid,
-                    runtime,
-                    num_dags,
-                    num_errors,
-                    stat.last_duration,
-                    last_run,
+
+        for files in known_files.values():
+            for file in files:
+                stat = self._file_stats[file]
+                proc = self._processors.get(file)
+                num_dags = stat.num_dags
+                num_errors = stat.import_errors
+                file_name = Path(file.rel_path).stem
+                processor_pid = proc.pid if proc else None
+                processor_start_time = proc.start_time if proc else None
+                runtime = (now - processor_start_time) if processor_start_time 
else None
+                last_run = stat.last_finish_time
+                if last_run:
+                    seconds_ago = (utcnow - last_run).total_seconds()
+                    
Stats.gauge(f"dag_processing.last_run.seconds_ago.{file_name}", seconds_ago)
+
+                rows.append(
+                    (
+                        file.bundle_name,
+                        file.rel_path,
+                        processor_pid,
+                        runtime,
+                        num_dags,
+                        num_errors,
+                        stat.last_duration,
+                        last_run,
+                    )
                 )
-            )
 
         # Sort by longest last runtime. (Can't sort None values in python3)
         rows.sort(key=lambda x: x[5] or 0.0, reverse=True)
@@ -699,34 +690,51 @@ class DagFileProcessorManager(LoggingMixin):
 
         self.log.info(log_str)
 
-    def set_files(self, files: list[DagFileInfo]):
+    def handle_removed_files(self, known_files: dict[str, set[DagFileInfo]]):
         """
-        Update the set of files to track in the dag processor.
+        Remove from data structures the files that are missing.
+
+        Also, terminate processes that may be running on those removed files.
 
-        :param files: list of files
+        :param known_files: structure containing known files per-bundle
         :return: None
         """
-        self._files = files
+        files_set: set[DagFileInfo] = set()
+        """Set containing all observed files.
+
+        We consolidate to one set for performance.
+        """
+
+        for v in known_files.values():
+            files_set |= v
 
+        self.purge_removed_files_from_queue(present=files_set)
+        self.terminate_orphan_processes(present=files_set)
+        self.remove_orphaned_file_stats(present=files_set)
+
+    def purge_removed_files_from_queue(self, present: set[DagFileInfo]):
+        """Remove from queue any files no longer observed locally."""
+        self._file_queue = deque(x for x in self._file_queue if x in present)
         Stats.gauge("dag_processing.file_path_queue_size", 
len(self._file_queue))
 
-        # Stop processors that are working on deleted files
-        filtered_processors = {}
-        for file, processor in self._processors.items():
-            if file in files:
-                filtered_processors[file] = processor
-            else:
+    def remove_orphaned_file_stats(self, present: set[DagFileInfo]):
+        """Remove the stats for any dag files that don't exist anymore."""
+        # todo: store stats by bundle also?
+        stats_to_remove = set(self._file_stats).difference(present)
+        for file in stats_to_remove:
+            del self._file_stats[file]
+
+    def terminate_orphan_processes(self, present: set[DagFileInfo]):
+        """Stop processors that are working on deleted files."""
+        for file in list(self._processors.keys()):
+            if file not in present:
+                processor = self._processors.pop(file, None)
+                if not processor:
+                    continue
                 self.log.warning("Stopping processor for %s", file)
                 Stats.decr("dag_processing.processes", tags={"file_path": 
file, "action": "stop"})
                 processor.kill(signal.SIGKILL)
-                self._file_stats.pop(file)
-
-        to_remove = set(self._file_stats).difference(self._files)
-        for file in to_remove:
-            # Remove the stats for any dag files that don't exist anymore
-            del self._file_stats[file]
-
-        self._processors = filtered_processors
+                self._file_stats.pop(file, None)
 
     @provide_session
     def _collect_results(self, session: Session = NEW_SESSION):
@@ -825,14 +833,44 @@ class DagFileProcessorManager(LoggingMixin):
             self._processors[file] = processor
             Stats.gauge("dag_processing.file_path_queue_size", 
len(self._file_queue))
 
-    def add_files_to_queue(self):
-        for file in self._files:
-            if file not in self._file_stats:
-                # We found new file after refreshing dir. add to parsing queue 
at start
-                self.log.info("Adding new file %s to parsing queue", file)
-                self._file_queue.appendleft(file)
+    def add_files_to_queue(self, known_files: dict[str, set[DagFileInfo]]):
+        for files in known_files.values():
+            for file in files:
+                if file not in self._file_stats:  # todo: store stats by 
bundle also?
+                    # We found new file after refreshing dir. add to parsing 
queue at start
+                    self.log.info("Adding new file %s to parsing queue", file)
+                    self._file_queue.appendleft(file)
+
+    def _sort_by_mtime(self, files: Iterable[DagFileInfo]):
+        files_with_mtime: dict[DagFileInfo, float] = {}
+        changed_recently = set()
+        for file in files:
+            try:
+                modified_timestamp = os.path.getmtime(file.absolute_path)
+                modified_datetime = datetime.fromtimestamp(modified_timestamp, 
tz=timezone.utc)
+                files_with_mtime[file] = modified_timestamp
+                last_time = self._file_stats[file].last_finish_time
+                if not last_time:
+                    continue
+                if modified_datetime > last_time:
+                    changed_recently.add(file)
+            except FileNotFoundError:
+                self.log.warning("Skipping processing of missing file: %s", 
file)
+                self._file_stats.pop(file, None)
+                continue
+        file_infos = [info for info, ts in sorted(files_with_mtime.items(), 
key=itemgetter(1), reverse=True)]
+        return file_infos, changed_recently
+
+    def processed_recently(self, now, file):
+        last_time = self._file_stats[file].last_finish_time
+        if not last_time:
+            return False
+        elapsed_ss = (now - last_time).total_seconds()
+        if elapsed_ss < self._file_process_interval:
+            return True
+        return False
 
-    def prepare_file_queue(self):
+    def prepare_file_queue(self, known_files: dict[str, set[DagFileInfo]]):
         """
         Scan dags dir to generate more file paths to process.
 
@@ -846,63 +884,34 @@ class DagFileProcessorManager(LoggingMixin):
 
         # Sort the file paths by the parsing order mode
         list_mode = conf.get("dag_processor", "file_parsing_sort_mode")
-
-        files_with_mtime: dict[DagFileInfo, datetime] = {}
-        file_infos: list[DagFileInfo] = []
-        is_mtime_mode = list_mode == "modified_time"
-
-        recently_processed: list[DagFileInfo] = []
-        to_stop = set()
-        for file in self._files:
-            if is_mtime_mode:
-                try:
-                    files_with_mtime[file] = 
os.path.getmtime(file.absolute_path)
-                except FileNotFoundError:
-                    self.log.warning("Skipping processing of missing file: 
%s", file)
-                    self._file_stats.pop(file, None)
-                    to_stop.add(file)
-                    continue
-                file_modified_time = 
datetime.fromtimestamp(files_with_mtime[file], tz=timezone.utc)
-            else:
-                file_infos.append(file)
-                file_modified_time = None
-
-            # Find file paths that were recently processed to exclude them
-            # from being added to file_queue
-            # unless they were modified recently and parsing mode is 
"modified_time"
-            # in which case we don't honor "self._file_process_interval" 
(min_file_process_interval)
-            if (
-                (last_finish_time := self._file_stats[file].last_finish_time) 
is not None
-                and (now - last_finish_time).total_seconds() < 
self._file_process_interval
-                and not (is_mtime_mode and file_modified_time and 
(file_modified_time > last_finish_time))
-            ):
-                recently_processed.append(file)
-
-        # Sort file paths via last modified time
-        if is_mtime_mode:
-            file_infos = [
-                info for info, ts in sorted(files_with_mtime.items(), 
key=itemgetter(1), reverse=True)
-            ]
+        recently_processed = set()
+        files = []
+
+        for bundle_files in known_files.values():
+            for file in bundle_files:
+                files.append(file)
+                if self.processed_recently(now, file):
+                    recently_processed.add(file)
+
+        changed_recently: set[DagFileInfo] = set()
+        if list_mode == "modified_time":
+            files, changed_recently = self._sort_by_mtime(files=files)
         elif list_mode == "alphabetical":
-            file_infos.sort(key=attrgetter("rel_path"))
+            files.sort(key=attrgetter("rel_path"))
         elif list_mode == "random_seeded_by_host":
             # Shuffle the list seeded by hostname so multiple DAG processors 
can work on different
             # set of files. Since we set the seed, the sort order will remain 
same per host
-            random.Random(get_hostname()).shuffle(file_infos)
-
-        if to_stop:
-            self.set_files([x for x in self._files if x not in to_stop])
+            random.Random(get_hostname()).shuffle(files)
 
         at_run_limit = [info for info, stat in self._file_stats.items() if 
stat.run_count == self.max_runs]
+        to_exclude = in_progress.union(at_run_limit)
 
-        to_exclude = in_progress.union(
-            recently_processed,
-            at_run_limit,
-        )
+        # exclude recently processed unless changed recently
+        to_exclude |= recently_processed - changed_recently
 
         # Do not convert the following list to set as set does not preserve 
the order
         # and we need to maintain the order of files for `[dag_processor] 
file_parsing_sort_mode`
-        to_queue = [x for x in file_infos if x not in to_exclude]
+        to_queue = [x for x in files if x not in to_exclude]
 
         if self.log.isEnabledFor(logging.DEBUG):
             for path, processor in self._processors.items():
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index e571c016e19..7ebbb48b4b4 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -64,7 +64,7 @@ from sqlalchemy import (
 )
 from sqlalchemy.ext.associationproxy import association_proxy
 from sqlalchemy.ext.hybrid import hybrid_property
-from sqlalchemy.orm import backref, relationship
+from sqlalchemy.orm import backref, load_only, relationship
 from sqlalchemy.sql import Select, expression
 
 from airflow import settings, utils
@@ -2280,24 +2280,33 @@ class DagModel(Base):
     @provide_session
     def deactivate_deleted_dags(
         cls,
-        active: set[tuple[str, str]],
+        bundle_name: str,
+        rel_filelocs: list[str],
         session: Session = NEW_SESSION,
     ) -> None:
         """
         Set ``is_active=False`` on the DAGs for which the DAG files have been 
removed.
 
-        :param active: tuples (bundle name, relative fileloc) of files that 
were observed.
+        :param bundle_name: bundle for filelocs
+        :param rel_filelocs: relative filelocs for bundle
         :param session: ORM Session
         """
         log.debug("Deactivating DAGs (for which DAG files are deleted) from %s 
table ", cls.__tablename__)
         dag_models = session.scalars(
-            select(cls).where(
-                cls.relative_fileloc.is_not(None),
+            select(cls)
+            .where(
+                cls.bundle_name == bundle_name,
+            )
+            .options(
+                load_only(
+                    cls.relative_fileloc,
+                    cls.is_active,
+                ),
             )
         )
 
         for dm in dag_models:
-            if (dm.bundle_name, dm.relative_fileloc) not in active:
+            if dm.relative_fileloc not in rel_filelocs:
                 dm.is_active = False
 
     @classmethod
diff --git a/tests/dag_processing/test_manager.py 
b/tests/dag_processing/test_manager.py
index 46d6fe5f50a..e5cef2e8033 100644
--- a/tests/dag_processing/test_manager.py
+++ b/tests/dag_processing/test_manager.py
@@ -22,6 +22,8 @@ import json
 import logging
 import os
 import random
+import re
+import shutil
 import signal
 import textwrap
 import time
@@ -34,7 +36,7 @@ from unittest.mock import MagicMock
 
 import pytest
 import time_machine
-from sqlalchemy import func
+from sqlalchemy import func, select
 from uuid6 import uuid7
 
 from airflow.callbacks.callback_requests import DagCallbackRequest
@@ -76,10 +78,35 @@ TEST_DAG_FOLDER = Path(__file__).parents[1].resolve() / 
"dags"
 DEFAULT_DATE = timezone.datetime(2016, 1, 1)
 
 
-def _get_dag_file_paths(files: list[str | Path]) -> list[DagFileInfo]:
+def _get_file_infos(files: list[str | Path]) -> list[DagFileInfo]:
     return [DagFileInfo(bundle_name="testing", bundle_path=TEST_DAGS_FOLDER, 
rel_path=Path(f)) for f in files]
 
 
+def mock_get_mtime(file: Path):
+    f = str(file)
+    m = re.match(pattern=r".*ss=(.+?)\.\w+", string=f)
+    if not m:
+        raise ValueError(f"unexpected: {file}")
+    match = m.group(1)
+    if match == "<class 'FileNotFoundError'>":
+        raise FileNotFoundError()
+    try:
+        return int(match)
+    except Exception:
+        raise ValueError(f"could not convert value {match} to int")
+
+
+def encode_mtime_in_filename(val):
+    from pathlib import PurePath
+
+    out = []
+    for fname, mtime in val:
+        f = PurePath(PurePath(fname).name)
+        addition = f"ss={str(mtime)}"
+        out.append(f"{f.stem}-{addition}{f.suffix}")
+    return out
+
+
 class TestDagFileProcessorManager:
     @pytest.fixture(autouse=True)
     def _disable_examples(self):
@@ -190,117 +217,129 @@ class TestDagFileProcessorManager:
         assert file_2 in manager._processors.keys()
         assert deque([file_3]) == manager._file_queue
 
-    def 
test_set_file_paths_when_processor_file_path_not_in_new_file_paths(self):
+    def 
test_handle_removed_files_when_processor_file_path_not_in_new_file_paths(self):
         """Ensure processors and file stats are removed when the file path is 
not in the new file paths"""
         manager = DagFileProcessorManager(max_runs=1)
+        bundle_name = "testing"
         file = DagFileInfo(
-            bundle_name="testing", rel_path=Path("missing_file.txt"), 
bundle_path=TEST_DAGS_FOLDER
+            bundle_name=bundle_name, rel_path=Path("missing_file.txt"), 
bundle_path=TEST_DAGS_FOLDER
         )
 
         manager._processors[file] = MagicMock()
         manager._file_stats[file] = DagFileStat()
 
-        manager.set_files(["abc.txt"])
+        manager.handle_removed_files({bundle_name: set()})
         assert manager._processors == {}
         assert file not in manager._file_stats
 
-    def 
test_set_file_paths_when_processor_file_path_is_in_new_file_paths(self):
+    def test_handle_removed_files_when_processor_file_path_is_present(self):
+        """handle_removed_files should not purge files that are still 
present."""
         manager = DagFileProcessorManager(max_runs=1)
-        file = DagFileInfo(bundle_name="testing", rel_path=Path("abc.txt"), 
bundle_path=TEST_DAGS_FOLDER)
+        bundle_name = "testing"
+        file = DagFileInfo(bundle_name=bundle_name, rel_path=Path("abc.txt"), 
bundle_path=TEST_DAGS_FOLDER)
         mock_processor = MagicMock()
 
         manager._processors[file] = mock_processor
 
-        manager.set_files([file])
+        manager.handle_removed_files(known_files={bundle_name: {file}})
         assert manager._processors == {file: mock_processor}
 
     @conf_vars({("dag_processor", "file_parsing_sort_mode"): "alphabetical"})
-    def test_file_paths_in_queue_sorted_alphabetically(self):
+    def test_files_in_queue_sorted_alphabetically(self):
         """Test dag files are sorted alphabetically"""
         file_names = ["file_3.py", "file_2.py", "file_4.py", "file_1.py"]
-        dag_files = _get_dag_file_paths(file_names)
-        ordered_dag_files = _get_dag_file_paths(sorted(file_names))
+        dag_files = _get_file_infos(file_names)
+        ordered_dag_files = _get_file_infos(sorted(file_names))
 
         manager = DagFileProcessorManager(max_runs=1)
-
-        manager.set_files(dag_files)
+        known_files = {"some-bundle": set(dag_files)}
         assert manager._file_queue == deque()
-        manager.prepare_file_queue()
+        manager.prepare_file_queue(known_files=known_files)
         assert manager._file_queue == deque(ordered_dag_files)
 
     @conf_vars({("dag_processor", "file_parsing_sort_mode"): 
"random_seeded_by_host"})
-    def test_file_paths_in_queue_sorted_random_seeded_by_host(self):
+    def test_files_sorted_random_seeded_by_host(self):
         """Test files are randomly sorted and seeded by host name"""
-        dag_files = _get_dag_file_paths(["file_3.py", "file_2.py", 
"file_4.py", "file_1.py"])
 
+        f_infos = _get_file_infos(["file_3.py", "file_2.py", "file_4.py", 
"file_1.py"])
+        known_files = {"anything": f_infos}
         manager = DagFileProcessorManager(max_runs=1)
 
-        manager.set_files(dag_files)
         assert manager._file_queue == deque()
-        manager.prepare_file_queue()
-
-        expected_order = deque(dag_files)
-        random.Random(get_hostname()).shuffle(expected_order)
-        assert manager._file_queue == expected_order
+        manager.prepare_file_queue(known_files=known_files)  # using list over 
test for reproducibility
+        random.Random(get_hostname()).shuffle(f_infos)
+        expected = deque(f_infos)
+        assert manager._file_queue == expected
 
         # Verify running it again produces same order
         manager._files = []
-        manager.prepare_file_queue()
-        assert manager._file_queue == expected_order
+        manager.prepare_file_queue(known_files=known_files)
+        assert manager._file_queue == expected
 
     @conf_vars({("dag_processor", "file_parsing_sort_mode"): "modified_time"})
-    @mock.patch("airflow.utils.file.os.path.getmtime")
-    def test_file_paths_in_queue_sorted_by_modified_time(self, mock_getmtime):
+    @mock.patch("airflow.utils.file.os.path.getmtime", new=mock_get_mtime)
+    def test_files_sorted_by_modified_time(self):
         """Test files are sorted by modified time"""
-        paths_with_mtime = {"file_3.py": 3.0, "file_2.py": 2.0, "file_4.py": 
5.0, "file_1.py": 4.0}
-        dag_files = _get_dag_file_paths(paths_with_mtime.keys())
-        mock_getmtime.side_effect = list(paths_with_mtime.values())
+        paths_with_mtime = [
+            ("file_3.py", 3.0),
+            ("file_2.py", 2.0),
+            ("file_4.py", 5.0),
+            ("file_1.py", 4.0),
+        ]
+        filenames = encode_mtime_in_filename(paths_with_mtime)
+        dag_files = _get_file_infos(filenames)
 
         manager = DagFileProcessorManager(max_runs=1)
 
-        manager.set_files(dag_files)
         assert manager._file_queue == deque()
-        manager.prepare_file_queue()
-        ordered_files = _get_dag_file_paths(["file_4.py", "file_1.py", 
"file_3.py", "file_2.py"])
+        manager.prepare_file_queue(known_files={"any": set(dag_files)})
+        ordered_files = _get_file_infos(
+            [
+                "file_4-ss=5.0.py",
+                "file_1-ss=4.0.py",
+                "file_3-ss=3.0.py",
+                "file_2-ss=2.0.py",
+            ]
+        )
         assert manager._file_queue == deque(ordered_files)
 
     @conf_vars({("dag_processor", "file_parsing_sort_mode"): "modified_time"})
-    @mock.patch("airflow.utils.file.os.path.getmtime")
-    def test_file_paths_in_queue_excludes_missing_file(self, mock_getmtime):
+    @mock.patch("airflow.utils.file.os.path.getmtime", new=mock_get_mtime)
+    def test_queued_files_exclude_missing_file(self):
         """Check that a file is not enqueued for processing if it has been 
deleted"""
-        dag_files = _get_dag_file_paths(["file_3.py", "file_2.py", 
"file_4.py"])
-        mock_getmtime.side_effect = [1.0, 2.0, FileNotFoundError()]
-
+        file_and_mtime = [("file_3.py", 2.0), ("file_2.py", 3.0), 
("file_4.py", FileNotFoundError)]
+        filenames = encode_mtime_in_filename(file_and_mtime)
+        file_infos = _get_file_infos(filenames)
         manager = DagFileProcessorManager(max_runs=1)
-
-        manager.set_files(dag_files)
-        manager.prepare_file_queue()
-
-        ordered_files = _get_dag_file_paths(["file_2.py", "file_3.py"])
+        manager.prepare_file_queue(known_files={"any": set(file_infos)})
+        ordered_files = _get_file_infos(["file_2-ss=3.0.py", 
"file_3-ss=2.0.py"])
         assert manager._file_queue == deque(ordered_files)
 
     @conf_vars({("dag_processor", "file_parsing_sort_mode"): "modified_time"})
-    @mock.patch("airflow.utils.file.os.path.getmtime")
-    def test_add_new_file_to_parsing_queue(self, mock_getmtime):
+    @mock.patch("airflow.utils.file.os.path.getmtime", new=mock_get_mtime)
+    def test_add_new_file_to_parsing_queue(self):
         """Check that new file is added to parsing queue"""
-        dag_files = _get_dag_file_paths(["file_1.py", "file_2.py", 
"file_3.py"])
-        mock_getmtime.side_effect = [1.0, 2.0, 3.0]
+        dag_files = _get_file_infos(["file_1-ss=2.0.py", "file_2-ss=3.0.py", 
"file_3-ss=4.0.py"])
+        from random import Random
 
+        Random("file_2.py").random()
         manager = DagFileProcessorManager(max_runs=1)
 
-        manager.set_files(dag_files)
-        manager.prepare_file_queue()
-        ordered_files = _get_dag_file_paths(["file_3.py", "file_2.py", 
"file_1.py"])
-        assert manager._file_queue == deque(ordered_files)
+        manager.prepare_file_queue(known_files={"any": set(dag_files)})
+        assert set(manager._file_queue) == set(dag_files)
 
-        manager.set_files(
+        manager.prepare_file_queue(
+            known_files={"any": set((*dag_files, 
*_get_file_infos(["file_4-ss=1.0.py"])))}
+        )
+        # manager.add_files_to_queue()
+        ordered_files = _get_file_infos(
             [
-                *dag_files,
-                DagFileInfo(bundle_name="testing", rel_path=Path("file_4.py"), 
bundle_path=TEST_DAGS_FOLDER),
+                "file_3-ss=4.0.py",
+                "file_2-ss=3.0.py",
+                "file_1-ss=2.0.py",
+                "file_4-ss=1.0.py",
             ]
         )
-        manager.add_files_to_queue()
-        ordered_files = _get_dag_file_paths(["file_4.py", "file_3.py", 
"file_2.py", "file_1.py"])
         assert manager._file_queue == deque(ordered_files)
 
     @conf_vars({("dag_processor", "file_parsing_sort_mode"): "modified_time"})
@@ -314,7 +353,7 @@ class TestDagFileProcessorManager:
         dag_file = DagFileInfo(
             bundle_name="testing", rel_path=Path("file_1.py"), 
bundle_path=TEST_DAGS_FOLDER
         )
-        dag_files = [dag_file]
+        known_files = {"does-not-matter": {dag_file}}
         mock_getmtime.side_effect = [initial_file_1_mtime]
 
         manager = DagFileProcessorManager(max_runs=3)
@@ -325,10 +364,9 @@ class TestDagFileProcessorManager:
             dag_file: DagFileStat(1, 0, last_finish_time, 1.0, 1, 1),
         }
         with time_machine.travel(freezed_base_time):
-            manager.set_files(dag_files)
             assert manager._file_queue == deque()
             # File Path Queue will be empty as the "modified time" < "last 
finish time"
-            manager.prepare_file_queue()
+            manager.prepare_file_queue(known_files=known_files)
             assert manager._file_queue == deque()
 
         # Simulate the DAG modification by using modified_time which is greater
@@ -336,13 +374,12 @@ class TestDagFileProcessorManager:
         file_1_new_mtime = freezed_base_time - timedelta(seconds=5)
         file_1_new_mtime_ts = file_1_new_mtime.timestamp()
         with time_machine.travel(freezed_base_time):
-            manager.set_files(dag_files)
             assert manager._file_queue == deque()
             # File Path Queue will be empty as the "modified time" < "last 
finish time"
             mock_getmtime.side_effect = [file_1_new_mtime_ts]
-            manager.prepare_file_queue()
+            manager.prepare_file_queue(known_files=known_files)
             # Check that file is added to the queue even though file was just 
recently passed
-            assert manager._file_queue == deque(dag_files)
+            assert manager._file_queue == deque([dag_file])
             assert last_finish_time < file_1_new_mtime
             assert (
                 manager._file_process_interval
@@ -363,7 +400,7 @@ class TestDagFileProcessorManager:
 
         manager = DagFileProcessorManager(dag_directory="directory", 
max_runs=1)
 
-        manager.set_files(dag_files)
+        manager.handle_removed_files(dag_files)
         manager._file_queue = deque(["file_2.py", "file_3.py", "file_4.py", 
"file_1.py"])
         manager._refresh_requested_filelocs()
         assert manager._file_queue == deque(["file_1.py", "file_2.py", 
"file_3.py", "file_4.py"])
@@ -608,32 +645,37 @@ class TestDagFileProcessorManager:
         assert dag.get_is_active()
 
     @pytest.mark.usefixtures("testing_dag_bundle")
-    def test_refresh_dags_dir_deactivates_deleted_zipped_dags(self, tmp_path, 
configure_testing_dag_bundle):
+    def test_refresh_dags_dir_deactivates_deleted_zipped_dags(
+        self, session, tmp_path, configure_testing_dag_bundle
+    ):
         """Test DagFileProcessorManager._refresh_dag_dir method"""
-        dagbag = DagBag(dag_folder=tmp_path, include_examples=False)
-        zipped_dag_path = os.path.join(TEST_DAGS_FOLDER, "test_zip.zip")
-        dagbag.process_file(zipped_dag_path)
-        dag = dagbag.get_dag("test_zip_dag")
-        dag.sync_to_db()
-        SerializedDagModel.write_dag(dag, bundle_name="testing")
+        dag_id = "test_zip_dag"
+        filename = "test_zip.zip"
+        source_location = os.path.join(TEST_DAGS_FOLDER, filename)
+        bundle_path = Path(tmp_path, 
"test_refresh_dags_dir_deactivates_deleted_zipped_dags")
+        bundle_path.mkdir(exist_ok=True)
+        zip_dag_path = bundle_path / filename
+        shutil.copy(source_location, zip_dag_path)
+
+        with configure_testing_dag_bundle(bundle_path):
+            manager = DagFileProcessorManager(max_runs=1)
+            manager.run()
 
-        # TODO: this test feels a bit fragile - pointing at the zip directly 
causes the test to fail
-        # TODO: jed look at this more closely - bagbad then process_file?!
+            assert SerializedDagModel.has_dag(dag_id)
+            assert DagCode.has_dag(dag_id)
+            assert DagVersion.get_latest_version(dag_id)
+            dag = session.scalar(select(DagModel).where(DagModel.dag_id == 
dag_id))
+            assert dag.is_active is True
 
-        # Mock might_contain_dag to mimic deleting the python file from the zip
-        with mock.patch("airflow.dag_processing.manager.might_contain_dag", 
return_value=False):
-            with configure_testing_dag_bundle(TEST_DAGS_FOLDER):
-                manager = DagFileProcessorManager(max_runs=1)
-                manager.run()
+            os.remove(zip_dag_path)
 
-        # Deleting the python file should not delete SDM for versioning sake
-        assert SerializedDagModel.has_dag("test_zip_dag")
-        # assert code not deleted for versioning sake
-        assert DagCode.has_dag(dag.dag_id)
-        # assert dagversion was not deleted
-        assert DagVersion.get_latest_version(dag.dag_id)
-        # assert dag deactivated
-        assert not dag.get_is_active()
+            manager.run()
+
+            assert SerializedDagModel.has_dag(dag_id)
+            assert DagCode.has_dag(dag_id)
+            assert DagVersion.get_latest_version(dag_id)
+            dag = session.scalar(select(DagModel).where(DagModel.dag_id == 
dag_id))
+            assert dag.is_active is False
 
     def test_deactivate_deleted_dags(self, dag_maker):
         with dag_maker("test_dag1") as dag1:
@@ -643,12 +685,16 @@ class TestDagFileProcessorManager:
         dag_maker.sync_dagbag_to_db()
 
         active_files = [
-            DagFileInfo(bundle_name="dag_maker", 
rel_path=Path("test_dag1.py"), bundle_path=TEST_DAGS_FOLDER),
+            DagFileInfo(
+                bundle_name="dag_maker",
+                rel_path=Path("test_dag1.py"),
+                bundle_path=TEST_DAGS_FOLDER,
+            ),
             # Mimic that the test_dag2.py file is deleted
         ]
 
         manager = DagFileProcessorManager(max_runs=1)
-        manager.deactivate_deleted_dags(active_files=active_files)
+        manager.deactivate_deleted_dags("dag_maker", active_files)
 
         dagbag = DagBag(read_dags_from_db=True)
         # The DAG from test_dag1.py is still active
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 84801d6cfb5..af815279737 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -1090,7 +1090,10 @@ class TestDag:
 
         assert orm_dag.is_active
 
-        
DagModel.deactivate_deleted_dags(list_py_file_paths(settings.DAGS_FOLDER))
+        DagModel.deactivate_deleted_dags(
+            bundle_name=orm_dag.bundle_name,
+            rel_filelocs=list_py_file_paths(settings.DAGS_FOLDER),
+        )
 
         orm_dag = session.query(DagModel).filter(DagModel.dag_id == 
dag_id).one()
         assert not orm_dag.is_active


Reply via email to