gopidesupavan commented on code in PR #63617:
URL: https://github.com/apache/airflow/pull/63617#discussion_r2943051660
##########
airflow-core/src/airflow/dag_processing/manager.py:
##########
@@ -730,17 +729,23 @@ def find_zipped_dags(abs_path: os.PathLike) ->
Iterator[str]:
except zipfile.BadZipFile:
self.log.exception("There was an error accessing ZIP file %s",
abs_path)
- rel_filelocs: list[str] = []
+ observed_filelocs: set[str] = set()
for info in present:
abs_path = str(info.absolute_path)
if abs_path.endswith(".py") or not zipfile.is_zipfile(abs_path):
- rel_filelocs.append(str(info.rel_path))
+ observed_filelocs.add(str(info.rel_path))
else:
if TYPE_CHECKING:
assert info.bundle_path
for abs_sub_path in
find_zipped_dags(abs_path=info.absolute_path):
rel_sub_path =
Path(abs_sub_path).relative_to(info.bundle_path)
- rel_filelocs.append(str(rel_sub_path))
+ observed_filelocs.add(str(rel_sub_path))
+
+ return observed_filelocs
+
+ def deactivate_deleted_dags(self, bundle_name: str, present:
set[DagFileInfo]) -> None:
+ """Deactivate DAGs that come from files that are no longer present in
bundle."""
+ rel_filelocs = list(self._get_observed_filelocs(present))
Review Comment:
make sense ;)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]