ephraimbuddy commented on code in PR #66484:
URL: https://github.com/apache/airflow/pull/66484#discussion_r3209881642


##########
airflow-core/src/airflow/dag_processing/manager.py:
##########
@@ -1008,26 +1013,26 @@ def handle_removed_files(self, known_files: dict[str, 
set[DagFileInfo]]):
         for v in known_files.values():
             files_set |= v
 
-        self.purge_removed_files_from_queue(present=files_set)
-        self.terminate_orphan_processes(present=files_set)
-        self.remove_orphaned_file_stats(present=files_set)
+        present_keys = {file.presence_key for file in files_set}
+        self.purge_removed_files_from_queue(present_keys=present_keys)
+        self.terminate_orphan_processes(present_keys=present_keys)
+        self.remove_orphaned_file_stats(present_keys=present_keys)
 
-    def purge_removed_files_from_queue(self, present: set[DagFileInfo]):
+    def purge_removed_files_from_queue(self, present_keys: set[tuple[str, 
Path]]):

Review Comment:
   This is a public method and any subclass would break due to this



##########
airflow-core/src/airflow/dag_processing/manager.py:
##########
@@ -1008,26 +1013,26 @@ def handle_removed_files(self, known_files: dict[str, 
set[DagFileInfo]]):
         for v in known_files.values():
             files_set |= v
 
-        self.purge_removed_files_from_queue(present=files_set)
-        self.terminate_orphan_processes(present=files_set)
-        self.remove_orphaned_file_stats(present=files_set)
+        present_keys = {file.presence_key for file in files_set}
+        self.purge_removed_files_from_queue(present_keys=present_keys)
+        self.terminate_orphan_processes(present_keys=present_keys)
+        self.remove_orphaned_file_stats(present_keys=present_keys)
 
-    def purge_removed_files_from_queue(self, present: set[DagFileInfo]):
+    def purge_removed_files_from_queue(self, present_keys: set[tuple[str, 
Path]]):
         """Remove from queue any files no longer observed locally."""
-        self._file_queue = deque(x for x in self._file_queue if x in present)
+        self._file_queue = deque(x for x in self._file_queue if x.presence_key 
in present_keys)
         stats.gauge("dag_processing.file_path_queue_size", 
len(self._file_queue))
 
-    def remove_orphaned_file_stats(self, present: set[DagFileInfo]):
+    def remove_orphaned_file_stats(self, present_keys: set[tuple[str, Path]]):

Review Comment:
   same here



##########
airflow-core/src/airflow/dag_processing/manager.py:
##########
@@ -1008,26 +1013,26 @@ def handle_removed_files(self, known_files: dict[str, 
set[DagFileInfo]]):
         for v in known_files.values():
             files_set |= v
 
-        self.purge_removed_files_from_queue(present=files_set)
-        self.terminate_orphan_processes(present=files_set)
-        self.remove_orphaned_file_stats(present=files_set)
+        present_keys = {file.presence_key for file in files_set}
+        self.purge_removed_files_from_queue(present_keys=present_keys)
+        self.terminate_orphan_processes(present_keys=present_keys)
+        self.remove_orphaned_file_stats(present_keys=present_keys)
 
-    def purge_removed_files_from_queue(self, present: set[DagFileInfo]):
+    def purge_removed_files_from_queue(self, present_keys: set[tuple[str, 
Path]]):
         """Remove from queue any files no longer observed locally."""
-        self._file_queue = deque(x for x in self._file_queue if x in present)
+        self._file_queue = deque(x for x in self._file_queue if x.presence_key 
in present_keys)
         stats.gauge("dag_processing.file_path_queue_size", 
len(self._file_queue))
 
-    def remove_orphaned_file_stats(self, present: set[DagFileInfo]):
+    def remove_orphaned_file_stats(self, present_keys: set[tuple[str, Path]]):
         """Remove the stats for any dag files that don't exist anymore."""
-        # todo: store stats by bundle also?
-        stats_to_remove = set(self._file_stats).difference(present)
+        stats_to_remove = {file for file in self._file_stats if 
file.presence_key not in present_keys}
         for file in stats_to_remove:
             del self._file_stats[file]
 
-    def terminate_orphan_processes(self, present: set[DagFileInfo]):
+    def terminate_orphan_processes(self, present_keys: set[tuple[str, Path]]):

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to