ephraimbuddy commented on code in PR #45860: URL: https://github.com/apache/airflow/pull/45860#discussion_r1926204600
########## airflow/dag_processing/manager.py: ########## @@ -388,18 +391,18 @@ def _fetch_callbacks( def _add_callback_to_queue(self, request: CallbackRequest): self.log.debug("Queuing %s CallbackRequest: %s", type(request).__name__, request) - self.log.warning("Callbacks are not implemented yet!") - # TODO: AIP-66 make callbacks bundle aware - return - self._callback_to_execute[request.full_filepath].append(request) - if request.full_filepath in self._file_path_queue: - # Remove file paths matching request.full_filepath from self._file_path_queue + bundle = DagBundlesManager().get_bundle(name=request.bundle_name, version=request.bundle_version) + dag_absolute_path = os.fspath(Path(bundle.path, request.filepath)) + file_info = DagFileInfo(path=dag_absolute_path, bundle_name=request.bundle_name) + self._callback_to_execute[file_info].append(request) + if file_info in self._file_path_queue: + # Remove file paths matching request.filepath from self._file_path_queue # Since we are already going to use that filepath to run callback, # there is no need to have same file path again in the queue Review Comment: > There is a reason now - we are running the callback on an old bundle version now, so we should leave the existing entry in the queue so the latest file is still parsed when it is its turn. I have added bundle_version to DagFileInfo, which means versions will be differentiated in the queue. We can keep it this way. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org