jedcunningham commented on code in PR #45860: URL: https://github.com/apache/airflow/pull/45860#discussion_r1926264772
########## airflow/dag_processing/manager.py: ########## @@ -398,18 +406,20 @@ def _fetch_callbacks( def _add_callback_to_queue(self, request: CallbackRequest): self.log.debug("Queuing %s CallbackRequest: %s", type(request).__name__, request) - self.log.warning("Callbacks are not implemented yet!") - # TODO: AIP-66 make callbacks bundle aware - return - self._callback_to_execute[request.full_filepath].append(request) - if request.full_filepath in self._file_path_queue: - # Remove file paths matching request.full_filepath from self._file_path_queue - # Since we are already going to use that filepath to run callback, - # there is no need to have same file path again in the queue + bundle = self._bundles_manager.get_bundle(name=request.bundle_name, version=request.bundle_version) + dag_absolute_path = os.fspath(Path(bundle.path, request.filepath)) + file_info = DagFileInfo( + path=dag_absolute_path, bundle_name=request.bundle_name, bundle_version=request.bundle_version + ) + self._callback_to_execute[file_info].append(request) + if file_info in self._file_path_queue: + # Remove DagFileInfo matching DagFileInfo from self._file_path_queue + # Since we are already going to use that DagFileInfo to run callback, + # there is no need to have same DagFileInfo again in the queue self._file_path_queue = deque( - file_path for file_path in self._file_path_queue if file_path != request.full_filepath + file_path for file_path in self._file_path_queue if file_path != file_info ) Review Comment: We don't need to do this anymore. Leave the existing entry, if it's in there, in there. Just add the versioned one to run the callback. ########## airflow/dag_processing/manager.py: ########## @@ -263,7 +269,9 @@ def deactivate_stale_dags( # last_parsed_time is the processor_timeout. Longer than that indicates that the DAG is # no longer present in the file. We have a stale_dag_threshold configured to prevent a # significant delay in deactivation of stale dags when a large timeout is configured - dag_file_path = DagFileInfo(path=dag.fileloc, bundle_name=dag.bundle_name) + dag_file_path = DagFileInfo( + path=dag.fileloc, bundle_name=dag.bundle_name, bundle_version=dag.bundle_version Review Comment: I don't think we want to have the version here - we want to see if the dag is still present in the "latest" (a.k.a none) version. ########## airflow/dag_processing/manager.py: ########## @@ -219,12 +223,10 @@ def run(self): # "Checking for new files in %s every %s seconds", self._dag_directory, self.dag_dir_list_interval # ) - from airflow.dag_processing.bundles.manager import DagBundlesManager - - DagBundlesManager().sync_bundles_to_db() + self._bundles_manager = DagBundlesManager().sync_bundles_to_db() Review Comment: ```suggestion self._bundles_manager.sync_bundles_to_db() ``` Doesn't the factory create it for us? ########## airflow/dag_processing/manager.py: ########## @@ -477,7 +491,8 @@ def _refresh_dag_bundles(self): new_file_paths = [f for f in self._file_paths if f.bundle_name != bundle.name] new_file_paths.extend( - DagFileInfo(path=path, bundle_name=bundle.name) for path in bundle_file_paths + DagFileInfo(path=path, bundle_name=bundle.name, bundle_version=bundle_model.version) Review Comment: Same here. We want `_file_stat` to be "versionless", which is easier if we just leave these all as None. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org