jedcunningham commented on code in PR #45860:
URL: https://github.com/apache/airflow/pull/45860#discussion_r1926270139


##########
airflow/dag_processing/manager.py:
##########
@@ -388,18 +391,18 @@ def _fetch_callbacks(
 
     def _add_callback_to_queue(self, request: CallbackRequest):
         self.log.debug("Queuing %s CallbackRequest: %s", 
type(request).__name__, request)
-        self.log.warning("Callbacks are not implemented yet!")
-        # TODO: AIP-66 make callbacks bundle aware
-        return
-        self._callback_to_execute[request.full_filepath].append(request)
-        if request.full_filepath in self._file_path_queue:
-            # Remove file paths matching request.full_filepath from 
self._file_path_queue
+        bundle = DagBundlesManager().get_bundle(name=request.bundle_name, 
version=request.bundle_version)
+        dag_absolute_path = os.fspath(Path(bundle.path, request.filepath))
+        file_info = DagFileInfo(path=dag_absolute_path, 
bundle_name=request.bundle_name)
+        self._callback_to_execute[file_info].append(request)
+        if file_info in self._file_path_queue:
+            # Remove file paths matching request.filepath from 
self._file_path_queue
             # Since we are already going to use that filepath to run callback,
             # there is no need to have same file path again in the queue

Review Comment:
   Adding bundle_version to DagFileInfo sounds good. Good way to differentiate 
them. But, still don't think we should remove the entries. Left another comment 
below about this too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to