kaxil commented on code in PR #63617:
URL: https://github.com/apache/airflow/pull/63617#discussion_r2942976958


##########
airflow-core/src/airflow/dag_processing/manager.py:
##########
@@ -694,7 +694,7 @@ def _refresh_dag_bundles(self, known_files: dict[str, 
set[DagFileInfo]]):
             self.deactivate_deleted_dags(bundle_name=bundle.name, 
present=found_files)
             self.clear_orphaned_import_errors(
                 bundle_name=bundle.name,
-                observed_filelocs={str(x.rel_path) for x in found_files},  # 
todo: make relative
+                observed_filelocs=self._get_observed_filelocs(found_files),

Review Comment:
   `_get_observed_filelocs(found_files)` opens and reads every zip archive in 
the bundle. But `deactivate_deleted_dags` (line 694) also calls 
`_get_observed_filelocs` internally with the same `found_files` set (see line 
748). So each zip gets opened and scanned twice per refresh cycle.
   
   You could compute the result once here and pass it into both 
`deactivate_deleted_dags` and `clear_orphaned_import_errors`.



##########
airflow-core/src/airflow/dag_processing/manager.py:
##########
@@ -730,17 +729,23 @@ def find_zipped_dags(abs_path: os.PathLike) -> 
Iterator[str]:
             except zipfile.BadZipFile:
                 self.log.exception("There was an error accessing ZIP file %s", 
abs_path)
 
-        rel_filelocs: list[str] = []
+        observed_filelocs: set[str] = set()
         for info in present:
             abs_path = str(info.absolute_path)
             if abs_path.endswith(".py") or not zipfile.is_zipfile(abs_path):
-                rel_filelocs.append(str(info.rel_path))
+                observed_filelocs.add(str(info.rel_path))
             else:
                 if TYPE_CHECKING:
                     assert info.bundle_path
                 for abs_sub_path in 
find_zipped_dags(abs_path=info.absolute_path):
                     rel_sub_path = 
Path(abs_sub_path).relative_to(info.bundle_path)
-                    rel_filelocs.append(str(rel_sub_path))
+                    observed_filelocs.add(str(rel_sub_path))
+
+        return observed_filelocs
+
+    def deactivate_deleted_dags(self, bundle_name: str, present: 
set[DagFileInfo]) -> None:
+        """Deactivate DAGs that come from files that are no longer present in 
bundle."""
+        rel_filelocs = list(self._get_observed_filelocs(present))

Review Comment:
   This calls `_get_observed_filelocs(present)` but the caller 
(`_refresh_dag_bundles`) already calls it with the same `found_files` on line 
697. Consider accepting the pre-computed `observed_filelocs` as a parameter 
instead, so the zip expansion happens only once per bundle.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to