hkc-8010 commented on code in PR #66484:
URL: https://github.com/apache/airflow/pull/66484#discussion_r3226457406


##########
airflow-core/src/airflow/dag_processing/manager.py:
##########
@@ -1261,11 +1268,14 @@ def _add_new_files_to_queue(self, known_files: 
dict[str, set[DagFileInfo]]):
         A "new" file is a file that has not been processed yet and is not 
currently being processed.
         """
         new_files = []
+        tracked_presence_keys = {file.presence_key for file in 
self._file_queue}
+        tracked_presence_keys.update(file.presence_key for file in 
self._file_stats)
+        tracked_presence_keys.update(file.presence_key for file in 
self._processors)
         for files in known_files.values():
             for file in files:
-                # todo: store stats by bundle also?

Review Comment:
   @ephraimbuddy I do not think that TODO is applicable to this path anymore 
after the `presence_key` change. `_add_new_files_to_queue()` is now 
intentionally checking whether a file is already represented by `bundle_name + 
rel_path` across the queue, processors, and stats, so an unversioned scan 
result will not be re-added while the same file is already tracked under a 
versioned `DagFileInfo`.
   
   We still keep full `DagFileInfo` identity, including `bundle_version`, for 
the queue/process/stat entries themselves. The narrower presence key is only 
for these manager-side “already represented” checks, which is the behavior this 
PR is fixing. Since the old TODO is no longer present in the latest diff and 
the path now has the explicit presence-key behavior, I am resolving this thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to