hkc-8010 commented on code in PR #66484:
URL: https://github.com/apache/airflow/pull/66484#discussion_r3212466130


##########
airflow-core/src/airflow/dag_processing/manager.py:
##########
@@ -1008,26 +1013,26 @@ def handle_removed_files(self, known_files: dict[str, 
set[DagFileInfo]]):
         for v in known_files.values():
             files_set |= v
 
-        self.purge_removed_files_from_queue(present=files_set)
-        self.terminate_orphan_processes(present=files_set)
-        self.remove_orphaned_file_stats(present=files_set)
+        present_keys = {file.presence_key for file in files_set}
+        self.purge_removed_files_from_queue(present_keys=present_keys)
+        self.terminate_orphan_processes(present_keys=present_keys)
+        self.remove_orphaned_file_stats(present_keys=present_keys)
 
-    def purge_removed_files_from_queue(self, present: set[DagFileInfo]):
+    def purge_removed_files_from_queue(self, present_keys: set[tuple[str, 
Path]]):

Review Comment:
   @ephraimbuddy Addressed in `a328aae17ba`. I restored the public method 
signatures to keep taking `present: set[DagFileInfo]` and moved the 
presence-key conversion behind private helpers, so subclasses keep the same 
override/call surface. I also updated the tests and reran `pytest 
airflow-core/tests/unit/dag_processing/test_manager.py -q` plus the three 
Breeze variants; all passed with `105 passed, 1 warning`.



##########
airflow-core/src/airflow/dag_processing/manager.py:
##########
@@ -1008,26 +1013,26 @@ def handle_removed_files(self, known_files: dict[str, 
set[DagFileInfo]]):
         for v in known_files.values():
             files_set |= v
 
-        self.purge_removed_files_from_queue(present=files_set)
-        self.terminate_orphan_processes(present=files_set)
-        self.remove_orphaned_file_stats(present=files_set)
+        present_keys = {file.presence_key for file in files_set}
+        self.purge_removed_files_from_queue(present_keys=present_keys)
+        self.terminate_orphan_processes(present_keys=present_keys)
+        self.remove_orphaned_file_stats(present_keys=present_keys)
 
-    def purge_removed_files_from_queue(self, present: set[DagFileInfo]):
+    def purge_removed_files_from_queue(self, present_keys: set[tuple[str, 
Path]]):
         """Remove from queue any files no longer observed locally."""
-        self._file_queue = deque(x for x in self._file_queue if x in present)
+        self._file_queue = deque(x for x in self._file_queue if x.presence_key 
in present_keys)
         stats.gauge("dag_processing.file_path_queue_size", 
len(self._file_queue))
 
-    def remove_orphaned_file_stats(self, present: set[DagFileInfo]):
+    def remove_orphaned_file_stats(self, present_keys: set[tuple[str, Path]]):

Review Comment:
   Applied the same compatibility fix here as part of `a328aae17ba`.



##########
airflow-core/src/airflow/dag_processing/manager.py:
##########
@@ -1008,26 +1013,26 @@ def handle_removed_files(self, known_files: dict[str, 
set[DagFileInfo]]):
         for v in known_files.values():
             files_set |= v
 
-        self.purge_removed_files_from_queue(present=files_set)
-        self.terminate_orphan_processes(present=files_set)
-        self.remove_orphaned_file_stats(present=files_set)
+        present_keys = {file.presence_key for file in files_set}
+        self.purge_removed_files_from_queue(present_keys=present_keys)
+        self.terminate_orphan_processes(present_keys=present_keys)
+        self.remove_orphaned_file_stats(present_keys=present_keys)
 
-    def purge_removed_files_from_queue(self, present: set[DagFileInfo]):
+    def purge_removed_files_from_queue(self, present_keys: set[tuple[str, 
Path]]):
         """Remove from queue any files no longer observed locally."""
-        self._file_queue = deque(x for x in self._file_queue if x in present)
+        self._file_queue = deque(x for x in self._file_queue if x.presence_key 
in present_keys)
         stats.gauge("dag_processing.file_path_queue_size", 
len(self._file_queue))
 
-    def remove_orphaned_file_stats(self, present: set[DagFileInfo]):
+    def remove_orphaned_file_stats(self, present_keys: set[tuple[str, Path]]):
         """Remove the stats for any dag files that don't exist anymore."""
-        # todo: store stats by bundle also?
-        stats_to_remove = set(self._file_stats).difference(present)
+        stats_to_remove = {file for file in self._file_stats if 
file.presence_key not in present_keys}
         for file in stats_to_remove:
             del self._file_stats[file]
 
-    def terminate_orphan_processes(self, present: set[DagFileInfo]):
+    def terminate_orphan_processes(self, present_keys: set[tuple[str, Path]]):

Review Comment:
   Applied the same compatibility fix here as part of `a328aae17ba`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to