This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9aa2cea9534 Remove unused `dag_ids` argument to DagFile processor 
classes. (#44845)
9aa2cea9534 is described below

commit 9aa2cea953480ef69f950d6d82088ad81b509287
Author: Ash Berlin-Taylor <a...@apache.org>
AuthorDate: Wed Dec 11 17:52:28 2024 +0000

    Remove unused `dag_ids` argument to DagFile processor classes. (#44845)
    
    This hasn't been possible to set in a while, (like since sometime before 
2.0,
    possibly even before 1.8) and the doc string gives a clue to the behaviour:
    only to schedule certain dags, but that is not the job of the dag processor
    and hasn't been involved in that flow since 2.0.
    
    Time to go.
---
 .../local_commands/dag_processor_command.py        |  1 -
 airflow/dag_processing/manager.py                  | 20 +-----
 airflow/dag_processing/processor.py                | 13 +---
 airflow/jobs/scheduler_job_runner.py               |  1 -
 tests/dag_processing/test_manager.py               | 74 ++++++----------------
 tests/dag_processing/test_processor.py             | 26 ++------
 tests/listeners/test_dag_import_error_listener.py  |  4 +-
 7 files changed, 31 insertions(+), 108 deletions(-)

diff --git a/airflow/cli/commands/local_commands/dag_processor_command.py 
b/airflow/cli/commands/local_commands/dag_processor_command.py
index 03513df17a0..f0c3bc5060c 100644
--- a/airflow/cli/commands/local_commands/dag_processor_command.py
+++ b/airflow/cli/commands/local_commands/dag_processor_command.py
@@ -43,7 +43,6 @@ def _create_dag_processor_job_runner(args: Any) -> 
DagProcessorJobRunner:
             processor_timeout=processor_timeout,
             dag_directory=args.subdir,
             max_runs=args.num_runs,
-            dag_ids=[],
         ),
     )
 
diff --git a/airflow/dag_processing/manager.py 
b/airflow/dag_processing/manager.py
index 7d9c9298a99..f60377d4966 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -116,7 +116,6 @@ class DagFileProcessorAgent(LoggingMixin, 
MultiprocessingStartMethodMixin):
     :param max_runs: The number of times to parse and schedule each file. -1
         for unlimited.
     :param processor_timeout: How long to wait before timing out a DAG file 
processor
-    :param dag_ids: if specified, only schedule tasks with these DAG IDs
     """
 
     def __init__(
@@ -124,13 +123,11 @@ class DagFileProcessorAgent(LoggingMixin, 
MultiprocessingStartMethodMixin):
         dag_directory: os.PathLike,
         max_runs: int,
         processor_timeout: timedelta,
-        dag_ids: list[str] | None,
     ):
         super().__init__()
         self._dag_directory: os.PathLike = dag_directory
         self._max_runs = max_runs
         self._processor_timeout = processor_timeout
-        self._dag_ids = dag_ids
         # Map from file path to the processor
         self._processors: dict[str, DagFileProcessorProcess] = {}
         # Pipe for communicating signals
@@ -156,7 +153,6 @@ class DagFileProcessorAgent(LoggingMixin, 
MultiprocessingStartMethodMixin):
                 self._max_runs,
                 self._processor_timeout,
                 child_signal_conn,
-                self._dag_ids,
             ),
         )
         self._process = process
@@ -177,26 +173,19 @@ class DagFileProcessorAgent(LoggingMixin, 
MultiprocessingStartMethodMixin):
         max_runs: int,
         processor_timeout: timedelta,
         signal_conn: MultiprocessingConnection,
-        dag_ids: list[str] | None,
     ) -> None:
         # Make this process start as a new process group - that makes it easy
         # to kill all sub-process of this at the OS-level, rather than having
         # to iterate the child processes
         set_new_process_group()
         span = Trace.get_current_span()
-        span.set_attributes(
-            {
-                "dag_directory": str(dag_directory),
-                "dag_ids": str(dag_ids),
-            }
-        )
+        span.set_attribute("dag_directory", str(dag_directory))
         setproctitle("airflow scheduler -- DagFileProcessorManager")
         reload_configuration_for_dag_processing()
         processor_manager = DagFileProcessorManager(
             dag_directory=dag_directory,
             max_runs=max_runs,
             processor_timeout=processor_timeout,
-            dag_ids=dag_ids,
             signal_conn=signal_conn,
         )
         processor_manager.start()
@@ -307,7 +296,6 @@ class DagFileProcessorManager(LoggingMixin):
         for unlimited.
     :param processor_timeout: How long to wait before timing out a DAG file 
processor
     :param signal_conn: connection to communicate signal with processor agent.
-    :param dag_ids: if specified, only schedule tasks with these DAG IDs
     """
 
     def __init__(
@@ -315,7 +303,6 @@ class DagFileProcessorManager(LoggingMixin):
         dag_directory: os.PathLike[str],
         max_runs: int,
         processor_timeout: timedelta,
-        dag_ids: list[str] | None,
         signal_conn: MultiprocessingConnection | None = None,
     ):
         super().__init__()
@@ -325,7 +312,6 @@ class DagFileProcessorManager(LoggingMixin):
         self._max_runs = max_runs
         # signal_conn is None for dag_processor_standalone mode.
         self._direct_scheduler_conn = signal_conn
-        self._dag_ids = dag_ids
         self._parsing_start_time: float | None = None
         self._dag_directory = dag_directory
         # Set the signal conn in to non-blocking mode, so that attempting to
@@ -1001,11 +987,10 @@ class DagFileProcessorManager(LoggingMixin):
         self.log.debug("%s file paths queued for processing", 
len(self._file_path_queue))
 
     @staticmethod
-    def _create_process(file_path, dag_ids, dag_directory, callback_requests):
+    def _create_process(file_path, dag_directory, callback_requests):
         """Create DagFileProcessorProcess instance."""
         return DagFileProcessorProcess(
             file_path=file_path,
-            dag_ids=dag_ids,
             dag_directory=dag_directory,
             callback_requests=callback_requests,
         )
@@ -1026,7 +1011,6 @@ class DagFileProcessorManager(LoggingMixin):
             callback_to_execute_for_file = self._callback_to_execute[file_path]
             processor = self._create_process(
                 file_path,
-                self._dag_ids,
                 self.get_dag_directory(),
                 callback_to_execute_for_file,
             )
diff --git a/airflow/dag_processing/processor.py 
b/airflow/dag_processing/processor.py
index 840c17300f5..b3e6ff770b8 100644
--- a/airflow/dag_processing/processor.py
+++ b/airflow/dag_processing/processor.py
@@ -92,7 +92,6 @@ class DagFileProcessorProcess(LoggingMixin, 
MultiprocessingStartMethodMixin):
     Runs DAG processing in a separate process using DagFileProcessor.
 
     :param file_path: a Python file containing Airflow DAG definitions
-    :param dag_ids: If specified, only look at these DAG ID's
     :param callback_requests: failure callback to execute
     """
 
@@ -102,13 +101,11 @@ class DagFileProcessorProcess(LoggingMixin, 
MultiprocessingStartMethodMixin):
     def __init__(
         self,
         file_path: str,
-        dag_ids: list[str] | None,
         dag_directory: str,
         callback_requests: list[CallbackRequest],
     ):
         super().__init__()
         self._file_path = file_path
-        self._dag_ids = dag_ids
         self._dag_directory = dag_directory
         self._callback_requests = callback_requests
 
@@ -136,7 +133,6 @@ class DagFileProcessorProcess(LoggingMixin, 
MultiprocessingStartMethodMixin):
         result_channel: MultiprocessingConnection,
         parent_channel: MultiprocessingConnection,
         file_path: str,
-        dag_ids: list[str] | None,
         thread_name: str,
         dag_directory: str,
         callback_requests: list[CallbackRequest],
@@ -147,8 +143,6 @@ class DagFileProcessorProcess(LoggingMixin, 
MultiprocessingStartMethodMixin):
         :param result_channel: the connection to use for passing back the 
result
         :param parent_channel: the parent end of the channel to close in the 
child
         :param file_path: the file to process
-        :param dag_ids: if specified, only examine DAG ID's that are
-            in this list
         :param thread_name: the name to use for the process that is launched
         :param callback_requests: failure callback to execute
         :return: the process that was launched
@@ -174,7 +168,7 @@ class DagFileProcessorProcess(LoggingMixin, 
MultiprocessingStartMethodMixin):
             threading.current_thread().name = thread_name
 
             log.info("Started process (PID=%s) to work on %s", os.getpid(), 
file_path)
-            dag_file_processor = DagFileProcessor(dag_ids=dag_ids, 
dag_directory=dag_directory, log=log)
+            dag_file_processor = DagFileProcessor(dag_directory=dag_directory, 
log=log)
             result: tuple[int, int, int] = dag_file_processor.process_file(
                 file_path=file_path,
                 callback_requests=callback_requests,
@@ -241,7 +235,6 @@ class DagFileProcessorProcess(LoggingMixin, 
MultiprocessingStartMethodMixin):
                 _child_channel,
                 _parent_channel,
                 self.file_path,
-                self._dag_ids,
                 f"DagFileProcessor{self._instance_id}",
                 self._dag_directory,
                 self._callback_requests,
@@ -415,15 +408,13 @@ class DagFileProcessor(LoggingMixin):
 
     Returns a tuple of 'number of dags found' and 'the count of import errors'
 
-    :param dag_ids: If specified, only look at these DAG ID's
     :param log: Logger to save the processing process
     """
 
     UNIT_TEST_MODE: bool = conf.getboolean("core", "UNIT_TEST_MODE")
 
-    def __init__(self, dag_ids: list[str] | None, dag_directory: str, log: 
logging.Logger):
+    def __init__(self, dag_directory: str, log: logging.Logger):
         super().__init__()
-        self.dag_ids = dag_ids
         self._log = log
         self._dag_directory = dag_directory
         self.dag_warnings: set[tuple[str, str]] = set()
diff --git a/airflow/jobs/scheduler_job_runner.py 
b/airflow/jobs/scheduler_job_runner.py
index 21fa41aa2c5..0254b417a71 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -934,7 +934,6 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                 dag_directory=Path(self.subdir),
                 max_runs=self.num_times_parse_dags,
                 processor_timeout=processor_timeout,
-                dag_ids=[],
             )
 
         reset_signals = self.register_signals()
diff --git a/tests/dag_processing/test_manager.py 
b/tests/dag_processing/test_manager.py
index 4a338e164d6..fedc15a7437 100644
--- a/tests/dag_processing/test_manager.py
+++ b/tests/dag_processing/test_manager.py
@@ -80,8 +80,8 @@ DEFAULT_DATE = timezone.datetime(2016, 1, 1)
 class FakeDagFileProcessorRunner(DagFileProcessorProcess):
     # This fake processor will return the zombies it received in constructor
     # as its processing result w/o actually parsing anything.
-    def __init__(self, file_path, dag_ids, dag_directory, callbacks):
-        super().__init__(file_path, dag_ids, dag_directory, callbacks)
+    def __init__(self, file_path, dag_directory, callbacks):
+        super().__init__(file_path, dag_directory, callbacks)
         # We need a "real" selectable handle for waitable_handle to work
         readable, writable = multiprocessing.Pipe(duplex=False)
         writable.send("abc")
@@ -109,10 +109,9 @@ class FakeDagFileProcessorRunner(DagFileProcessorProcess):
         return self._result
 
     @staticmethod
-    def _create_process(file_path, callback_requests, dag_ids, dag_directory):
+    def _create_process(file_path, callback_requests, dag_directory):
         return FakeDagFileProcessorRunner(
             file_path,
-            dag_ids,
             dag_directory,
             callback_requests,
         )
@@ -169,7 +168,6 @@ class TestDagProcessorJobRunner:
             max_runs=1,
             processor_timeout=timedelta(days=365),
             signal_conn=child_pipe,
-            dag_ids=[],
         )
 
         with create_session() as session:
@@ -199,7 +197,6 @@ class TestDagProcessorJobRunner:
             max_runs=1,
             processor_timeout=timedelta(days=365),
             signal_conn=child_pipe,
-            dag_ids=[],
         )
 
         self.run_processor_manager_one_loop(manager, parent_pipe)
@@ -217,7 +214,6 @@ class TestDagProcessorJobRunner:
             max_runs=1,
             processor_timeout=timedelta(days=365),
             signal_conn=MagicMock(),
-            dag_ids=[],
         )
 
         file_1 = "file_1.py"
@@ -246,7 +242,6 @@ class TestDagProcessorJobRunner:
             max_runs=1,
             processor_timeout=timedelta(days=365),
             signal_conn=MagicMock(),
-            dag_ids=[],
         )
 
         mock_processor = MagicMock()
@@ -266,7 +261,6 @@ class TestDagProcessorJobRunner:
             max_runs=1,
             processor_timeout=timedelta(days=365),
             signal_conn=MagicMock(),
-            dag_ids=[],
         )
 
         mock_processor = MagicMock()
@@ -295,7 +289,6 @@ class TestDagProcessorJobRunner:
             max_runs=1,
             processor_timeout=timedelta(days=365),
             signal_conn=MagicMock(),
-            dag_ids=[],
         )
 
         manager.set_file_paths(dag_files)
@@ -320,7 +313,6 @@ class TestDagProcessorJobRunner:
             max_runs=1,
             processor_timeout=timedelta(days=365),
             signal_conn=MagicMock(),
-            dag_ids=[],
         )
 
         manager.set_file_paths(dag_files)
@@ -380,7 +372,6 @@ class TestDagProcessorJobRunner:
             max_runs=1,
             processor_timeout=timedelta(days=365),
             signal_conn=MagicMock(),
-            dag_ids=[],
         )
 
         manager.set_file_paths(dag_files)
@@ -413,7 +404,6 @@ class TestDagProcessorJobRunner:
             max_runs=1,
             processor_timeout=timedelta(days=365),
             signal_conn=MagicMock(),
-            dag_ids=[],
         )
 
         manager.set_file_paths(dag_files)
@@ -445,7 +435,6 @@ class TestDagProcessorJobRunner:
             max_runs=1,
             processor_timeout=timedelta(days=365),
             signal_conn=MagicMock(),
-            dag_ids=[],
         )
 
         manager.set_file_paths(dag_files)
@@ -486,7 +475,6 @@ class TestDagProcessorJobRunner:
             max_runs=3,
             processor_timeout=timedelta(days=365),
             signal_conn=MagicMock(),
-            dag_ids=[],
         )
 
         # let's say the DAG was just parsed 10 seconds before the Freezed time
@@ -542,7 +530,6 @@ class TestDagProcessorJobRunner:
             max_runs=1,
             processor_timeout=timedelta(days=365),
             signal_conn=MagicMock(),
-            dag_ids=[],
         )
 
         manager.set_file_paths(dag_files)
@@ -563,7 +550,6 @@ class TestDagProcessorJobRunner:
             max_runs=1,
             processor_timeout=timedelta(minutes=10),
             signal_conn=MagicMock(),
-            dag_ids=[],
         )
 
         test_dag_path = str(TEST_DAG_FOLDER / "test_example_bash_operator.py")
@@ -630,7 +616,6 @@ class TestDagProcessorJobRunner:
             max_runs=1,
             processor_timeout=timedelta(minutes=10),
             signal_conn=MagicMock(),
-            dag_ids=[],
         )
 
         test_dag_path = str(TEST_DAG_FOLDER / "test_example_bash_operator.py")
@@ -682,12 +667,10 @@ class TestDagProcessorJobRunner:
             max_runs=1,
             processor_timeout=timedelta(seconds=5),
             signal_conn=MagicMock(),
-            dag_ids=[],
         )
 
         processor = DagFileProcessorProcess(
             file_path="abc.txt",
-            dag_ids=[],
             dag_directory=TEST_DAG_FOLDER,
             callback_requests=[],
         )
@@ -709,12 +692,10 @@ class TestDagProcessorJobRunner:
             max_runs=1,
             processor_timeout=timedelta(seconds=5),
             signal_conn=MagicMock(),
-            dag_ids=[],
         )
 
         processor = DagFileProcessorProcess(
             file_path="abc.txt",
-            dag_ids=[],
             dag_directory=str(TEST_DAG_FOLDER),
             callback_requests=[],
         )
@@ -741,7 +722,6 @@ class TestDagProcessorJobRunner:
 
         manager = DagFileProcessorManager(
             dag_directory=dag_directory,
-            dag_ids=[],
             max_runs=1,
             processor_timeout=timedelta(seconds=5),
             signal_conn=child_pipe,
@@ -782,7 +762,6 @@ class TestDagProcessorJobRunner:
 
             manager = DagFileProcessorManager(
                 dag_directory=processor_dir_1,
-                dag_ids=[],
                 max_runs=1,
                 signal_conn=child_pipe,
                 processor_timeout=timedelta(seconds=5),
@@ -798,7 +777,6 @@ class TestDagProcessorJobRunner:
 
             manager = DagFileProcessorManager(
                 dag_directory=processor_dir_2,
-                dag_ids=[],
                 max_runs=1,
                 signal_conn=child_pipe,
                 processor_timeout=timedelta(seconds=5),
@@ -863,7 +841,6 @@ class TestDagProcessorJobRunner:
 
         manager = DagFileProcessorManager(
             dag_directory=dag_filepath,
-            dag_ids=[],
             # A reasonable large number to ensure that we trigger the deadlock
             max_runs=100,
             processor_timeout=timedelta(seconds=5),
@@ -904,7 +881,6 @@ class TestDagProcessorJobRunner:
             max_runs=1,
             processor_timeout=timedelta(days=365),
             signal_conn=child_pipe,
-            dag_ids=[],
         )
 
         self.run_processor_manager_one_loop(manager, parent_pipe)
@@ -933,7 +909,6 @@ class TestDagProcessorJobRunner:
             max_runs=1,
             processor_timeout=timedelta(days=365),
             signal_conn=MagicMock(),
-            dag_ids=[],
         )
         dagbag = DagBag(dag_folder=tmp_path, include_examples=False)
         zipped_dag_path = os.path.join(TEST_DAGS_FOLDER, "test_zip.zip")
@@ -957,7 +932,6 @@ class TestDagProcessorJobRunner:
             max_runs=1,
             processor_timeout=timedelta(days=365),
             signal_conn=MagicMock(),
-            dag_ids=[],
         )
         dagbag = DagBag(dag_folder=tmp_path, include_examples=False)
         zipped_dag_path = os.path.join(TEST_DAGS_FOLDER, "test_zip.zip")
@@ -999,7 +973,6 @@ class TestDagProcessorJobRunner:
             max_runs=1,
             processor_timeout=timedelta(days=365),
             signal_conn=MagicMock(),
-            dag_ids=[],
         )
         manager.last_dag_dir_refresh_time = timezone.utcnow() - 
timedelta(minutes=10)
 
@@ -1044,7 +1017,6 @@ class TestDagProcessorJobRunner:
             max_runs=1,
             processor_timeout=timedelta(days=365),
             signal_conn=child_pipe,
-            dag_ids=[],
         )
 
         with create_session() as session:
@@ -1086,7 +1058,6 @@ class TestDagProcessorJobRunner:
             max_runs=1,
             processor_timeout=timedelta(days=365),
             signal_conn=child_pipe,
-            dag_ids=[],
         )
 
         with create_session() as session:
@@ -1121,7 +1092,6 @@ class TestDagProcessorJobRunner:
             max_runs=1,
             processor_timeout=timedelta(days=365),
             signal_conn=child_pipe,
-            dag_ids=[],
         )
 
         with create_session() as session:
@@ -1157,7 +1127,6 @@ class TestDagProcessorJobRunner:
             max_runs=1,
             processor_timeout=timedelta(days=365),
             signal_conn=child_pipe,
-            dag_ids=[],
         )
 
         with create_session() as session:
@@ -1175,7 +1144,6 @@ class TestDagProcessorJobRunner:
             max_runs=1,
             processor_timeout=timedelta(days=365),
             signal_conn=MagicMock(),
-            dag_ids=[],
         )
 
         dag1_req1 = DagCallbackRequest(
@@ -1273,7 +1241,7 @@ class TestDagFileProcessorAgent:
                 os.remove(log_file_loc)
 
             # Starting dag processing with 0 max_runs to avoid redundant 
operations.
-            processor_agent = DagFileProcessorAgent(test_dag_path, 0, 
timedelta(days=365), [])
+            processor_agent = DagFileProcessorAgent(test_dag_path, 0, 
timedelta(days=365))
             processor_agent.start()
 
             processor_agent._process.join()
@@ -1288,7 +1256,7 @@ class TestDagFileProcessorAgent:
         clear_db_dags()
 
         test_dag_path = TEST_DAG_FOLDER / "test_scheduler_dags.py"
-        processor_agent = DagFileProcessorAgent(test_dag_path, 1, 
timedelta(days=365), [])
+        processor_agent = DagFileProcessorAgent(test_dag_path, 1, 
timedelta(days=365))
         processor_agent.start()
         while not processor_agent.done:
             processor_agent.heartbeat()
@@ -1311,7 +1279,7 @@ class TestDagFileProcessorAgent:
             os.remove(log_file_loc)
 
         # Starting dag processing with 0 max_runs to avoid redundant 
operations.
-        processor_agent = DagFileProcessorAgent(test_dag_path, 0, 
timedelta(days=365), [])
+        processor_agent = DagFileProcessorAgent(test_dag_path, 0, 
timedelta(days=365))
         processor_agent.start()
 
         processor_agent._process.join()
@@ -1319,25 +1287,25 @@ class TestDagFileProcessorAgent:
         assert os.path.isfile(log_file_loc)
 
     def test_get_callbacks_pipe(self):
-        processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [])
+        processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365))
         processor_agent._parent_signal_conn = Mock()
         retval = processor_agent.get_callbacks_pipe()
         assert retval == processor_agent._parent_signal_conn
 
     def test_get_callbacks_pipe_no_parent_signal_conn(self):
-        processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [])
+        processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365))
         processor_agent._parent_signal_conn = None
         with pytest.raises(ValueError, match="Process not started"):
             processor_agent.get_callbacks_pipe()
 
     def test_heartbeat_no_parent_signal_conn(self):
-        processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [])
+        processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365))
         processor_agent._parent_signal_conn = None
         with pytest.raises(ValueError, match="Process not started"):
             processor_agent.heartbeat()
 
     def test_heartbeat_poll_eof_error(self):
-        processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [])
+        processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365))
         processor_agent._parent_signal_conn = Mock()
         processor_agent._parent_signal_conn.poll.return_value = True
         processor_agent._parent_signal_conn.recv = Mock()
@@ -1346,7 +1314,7 @@ class TestDagFileProcessorAgent:
         assert ret_val is None
 
     def test_heartbeat_poll_connection_error(self):
-        processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [])
+        processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365))
         processor_agent._parent_signal_conn = Mock()
         processor_agent._parent_signal_conn.poll.return_value = True
         processor_agent._parent_signal_conn.recv = Mock()
@@ -1355,7 +1323,7 @@ class TestDagFileProcessorAgent:
         assert ret_val is None
 
     def test_heartbeat_poll_process_message(self):
-        processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [])
+        processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365))
         processor_agent._parent_signal_conn = Mock()
         processor_agent._parent_signal_conn.poll.side_effect = [True, False]
         processor_agent._parent_signal_conn.recv = Mock()
@@ -1366,19 +1334,19 @@ class TestDagFileProcessorAgent:
 
     def test_process_message_invalid_type(self):
         message = "xyz"
-        processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [])
+        processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365))
         with pytest.raises(RuntimeError, match="Unexpected message received of 
type str"):
             processor_agent._process_message(message)
 
     def test_heartbeat_manager(self):
-        processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [])
+        processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365))
         processor_agent._parent_signal_conn = None
         with pytest.raises(ValueError, match="Process not started"):
             processor_agent._heartbeat_manager()
 
     @mock.patch("airflow.utils.process_utils.reap_process_group")
     def test_heartbeat_manager_process_restart(self, mock_pg):
-        processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [])
+        processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365))
         processor_agent._parent_signal_conn = Mock()
         processor_agent._process = MagicMock()
         processor_agent.start = Mock()
@@ -1392,7 +1360,7 @@ class TestDagFileProcessorAgent:
     @mock.patch("time.monotonic")
     @mock.patch("airflow.dag_processing.manager.reap_process_group")
     def test_heartbeat_manager_process_reap(self, mock_pg, 
mock_time_monotonic, mock_stats):
-        processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [])
+        processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365))
         processor_agent._parent_signal_conn = Mock()
         processor_agent._process = Mock()
         processor_agent._process.pid = 12345
@@ -1413,7 +1381,7 @@ class TestDagFileProcessorAgent:
         processor_agent.start.assert_called()
 
     def test_heartbeat_manager_terminate(self):
-        processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [])
+        processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365))
         processor_agent._parent_signal_conn = Mock()
         processor_agent._process = Mock()
         processor_agent._process.is_alive.return_value = True
@@ -1423,7 +1391,7 @@ class TestDagFileProcessorAgent:
         
processor_agent._parent_signal_conn.send.assert_called_with(DagParsingSignal.TERMINATE_MANAGER)
 
     def test_heartbeat_manager_terminate_conn_err(self):
-        processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [])
+        processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365))
         processor_agent._process = Mock()
         processor_agent._process.is_alive.return_value = True
         processor_agent._parent_signal_conn = Mock()
@@ -1434,7 +1402,7 @@ class TestDagFileProcessorAgent:
         
processor_agent._parent_signal_conn.send.assert_called_with(DagParsingSignal.TERMINATE_MANAGER)
 
     def test_heartbeat_manager_end_no_process(self):
-        processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [])
+        processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365))
         processor_agent._process = Mock()
         processor_agent._process.__bool__ = Mock(return_value=False)
         processor_agent._process.side_effect = [None]
@@ -1449,7 +1417,7 @@ class TestDagFileProcessorAgent:
         test_dag_path = TEST_DAG_FOLDER / "test_scheduler_dags.py"
 
         # Starting dag processing with 0 max_runs to avoid redundant 
operations.
-        processor_agent = DagFileProcessorAgent(test_dag_path, 0, 
timedelta(days=365), [])
+        processor_agent = DagFileProcessorAgent(test_dag_path, 0, 
timedelta(days=365))
         processor_agent.start()
 
         processor_agent._process.join()
@@ -1464,7 +1432,7 @@ class TestDagFileProcessorAgent:
         test_dag_path = TEST_DAG_FOLDER / "test_scheduler_dags.py"
 
         # Starting dag processing with 0 max_runs to avoid redundant 
operations.
-        processor_agent = DagFileProcessorAgent(test_dag_path, 0, 
timedelta(days=365), [])
+        processor_agent = DagFileProcessorAgent(test_dag_path, 0, 
timedelta(days=365))
         processor_agent.start()
 
         processor_agent._process.join()
diff --git a/tests/dag_processing/test_processor.py 
b/tests/dag_processing/test_processor.py
index c2962ea0411..b23cd44f959 100644
--- a/tests/dag_processing/test_processor.py
+++ b/tests/dag_processing/test_processor.py
@@ -107,18 +107,14 @@ class TestDagFileProcessor:
         self.clean_db()
 
     def _process_file(self, file_path, dag_directory, session):
-        dag_file_processor = DagFileProcessor(
-            dag_ids=[], dag_directory=str(dag_directory), log=mock.MagicMock()
-        )
+        dag_file_processor = 
DagFileProcessor(dag_directory=str(dag_directory), log=mock.MagicMock())
 
         dag_file_processor.process_file(file_path, [])
 
     @patch.object(TaskInstance, "handle_failure")
     def test_execute_on_failure_callbacks(self, mock_ti_handle_failure):
         dagbag = DagBag(dag_folder="/dev/null", include_examples=True, 
read_dags_from_db=False)
-        dag_file_processor = DagFileProcessor(
-            dag_ids=[], dag_directory=TEST_DAGS_FOLDER, log=mock.MagicMock()
-        )
+        dag_file_processor = DagFileProcessor(dag_directory=TEST_DAGS_FOLDER, 
log=mock.MagicMock())
         with create_session() as session:
             session.query(TaskInstance).delete()
             dag = dagbag.get_dag("example_branch_operator")
@@ -152,9 +148,7 @@ class TestDagFileProcessor:
     @patch.object(TaskInstance, "handle_failure")
     def test_execute_on_failure_callbacks_without_dag(self, 
mock_ti_handle_failure, has_serialized_dag):
         dagbag = DagBag(dag_folder="/dev/null", include_examples=True, 
read_dags_from_db=False)
-        dag_file_processor = DagFileProcessor(
-            dag_ids=[], dag_directory=TEST_DAGS_FOLDER, log=mock.MagicMock()
-        )
+        dag_file_processor = DagFileProcessor(dag_directory=TEST_DAGS_FOLDER, 
log=mock.MagicMock())
         with create_session() as session:
             session.query(TaskInstance).delete()
             dag = dagbag.get_dag("example_branch_operator")
@@ -188,9 +182,7 @@ class TestDagFileProcessor:
 
     def test_failure_callbacks_should_not_drop_hostname(self):
         dagbag = DagBag(dag_folder="/dev/null", include_examples=True, 
read_dags_from_db=False)
-        dag_file_processor = DagFileProcessor(
-            dag_ids=[], dag_directory=TEST_DAGS_FOLDER, log=mock.MagicMock()
-        )
+        dag_file_processor = DagFileProcessor(dag_directory=TEST_DAGS_FOLDER, 
log=mock.MagicMock())
         dag_file_processor.UNIT_TEST_MODE = False
 
         with create_session() as session:
@@ -224,9 +216,7 @@ class TestDagFileProcessor:
         callback_file = tmp_path.joinpath("callback.txt")
         callback_file.touch()
         monkeypatch.setenv("AIRFLOW_CALLBACK_FILE", str(callback_file))
-        dag_file_processor = DagFileProcessor(
-            dag_ids=[], dag_directory=TEST_DAGS_FOLDER, log=mock.MagicMock()
-        )
+        dag_file_processor = DagFileProcessor(dag_directory=TEST_DAGS_FOLDER, 
log=mock.MagicMock())
 
         dag = get_test_dag("test_on_failure_callback")
         task = dag.get_task(task_id="test_on_failure_callback_task")
@@ -576,7 +566,6 @@ class TestDagFileProcessor:
     def test_dag_parser_output_when_logging_to_stdout(self, 
mock_redirect_stdout_for_file):
         processor = DagFileProcessorProcess(
             file_path="abc.txt",
-            dag_ids=[],
             dag_directory=[],
             callback_requests=[],
         )
@@ -584,7 +573,6 @@ class TestDagFileProcessor:
             result_channel=MagicMock(),
             parent_channel=MagicMock(),
             file_path="fake_file_path",
-            dag_ids=[],
             thread_name="fake_thread_name",
             callback_requests=[],
             dag_directory=[],
@@ -597,7 +585,6 @@ class TestDagFileProcessor:
     def test_dag_parser_output_when_logging_to_file(self, 
mock_redirect_stdout_for_file):
         processor = DagFileProcessorProcess(
             file_path="abc.txt",
-            dag_ids=[],
             dag_directory=[],
             callback_requests=[],
         )
@@ -605,7 +592,6 @@ class TestDagFileProcessor:
             result_channel=MagicMock(),
             parent_channel=MagicMock(),
             file_path="fake_file_path",
-            dag_ids=[],
             thread_name="fake_thread_name",
             callback_requests=[],
             dag_directory=[],
@@ -622,7 +608,6 @@ class TestDagFileProcessor:
 
         processor = DagFileProcessorProcess(
             file_path=zip_filename,
-            dag_ids=[],
             dag_directory=[],
             callback_requests=[],
         )
@@ -638,7 +623,6 @@ class TestDagFileProcessor:
 
         processor = DagFileProcessorProcess(
             file_path=dag_filename,
-            dag_ids=[],
             dag_directory=[],
             callback_requests=[],
         )
diff --git a/tests/listeners/test_dag_import_error_listener.py 
b/tests/listeners/test_dag_import_error_listener.py
index ff63d141c78..aa085d3cfd7 100644
--- a/tests/listeners/test_dag_import_error_listener.py
+++ b/tests/listeners/test_dag_import_error_listener.py
@@ -95,9 +95,7 @@ class TestDagFileProcessor:
         self.clean_db()
 
     def _process_file(self, file_path, dag_directory, session):
-        dag_file_processor = DagFileProcessor(
-            dag_ids=[], dag_directory=str(dag_directory), log=mock.MagicMock()
-        )
+        dag_file_processor = 
DagFileProcessor(dag_directory=str(dag_directory), log=mock.MagicMock())
 
         dag_file_processor.process_file(file_path, [])
 

Reply via email to