ephraimbuddy commented on code in PR #66484:
URL: https://github.com/apache/airflow/pull/66484#discussion_r3220819225


##########
airflow-core/src/airflow/dag_processing/manager.py:
##########
@@ -1335,7 +1371,8 @@ def prepare_file_queue(self, known_files: dict[str, 
set[DagFileInfo]]):
 
         # If the file path is already being processed, or if a file was
         # processed recently, wait until the next batch
-        in_progress = set(self._processors)
+        in_progress_keys = {file.presence_key for file in self._processors}
+        file_stats_by_presence_key = {file.presence_key: stat for file, stat 
in self._file_stats.items()}

Review Comment:
   This dict is built here but not used for the `processed_recently(now, file)` 
call ten lines below — the call still does an O(n) linear scan over 
`_file_stats` per file, making `prepare_file_queue` O(n²) overall.
   
   Inline the lookup at the call site using this dict, and leave 
`processed_recently`'s signature untouched so any subclass override / external 
caller still works:
   
   ```python
   for bundle_files in known_files.values():
       for file in bundle_files:
           files.append(file)
           stat = file_stats_by_presence_key.get(file.presence_key)
           last_time = stat.last_finish_time if stat else None
           if last_time and (now - last_time).total_seconds() < 
self._file_process_interval:
               recently_processed.add(file)
   ```
   
   ---
   Drafted-by: Claude Code (Opus 4.7); reviewed by @ephraimbuddy before posting



##########
airflow-core/src/airflow/dag_processing/manager.py:
##########
@@ -1008,26 +1013,38 @@ def handle_removed_files(self, known_files: dict[str, 
set[DagFileInfo]]):
         for v in known_files.values():
             files_set |= v
 
-        self.purge_removed_files_from_queue(present=files_set)
-        self.terminate_orphan_processes(present=files_set)
-        self.remove_orphaned_file_stats(present=files_set)
+        present_keys = {file.presence_key for file in files_set}
+        self._purge_removed_files_from_queue(present_keys=present_keys)

Review Comment:
   Calling the underscore variant here bypasses any subclass override of the 
public `purge_removed_files_from_queue`. A subclass that added logging, 
metrics, or custom filtering inside the public method previously got invoked 
via `handle_removed_files`; after this change it doesn't. Same applies to the 
next two lines.
   
   Suggest collapsing to a single public method per concern, with the `present 
→ present_keys` conversion at the top, and calling the public method from here:
   
   ```python
   self.purge_removed_files_from_queue(present=files_set)
   self.terminate_orphan_processes(present=files_set)
   self.remove_orphaned_file_stats(present=files_set)
   ```
   
   The 3× redundant set construction is negligible at realistic scales and 
worth it to keep the extension point intact.
   
   ---
   Drafted-by: Claude Code (Opus 4.7); reviewed by @ephraimbuddy before posting



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to