ephraimbuddy commented on code in PR #45860:
URL: https://github.com/apache/airflow/pull/45860#discussion_r1926204600


##########
airflow/dag_processing/manager.py:
##########
@@ -388,18 +391,18 @@ def _fetch_callbacks(
 
     def _add_callback_to_queue(self, request: CallbackRequest):
         self.log.debug("Queuing %s CallbackRequest: %s", 
type(request).__name__, request)
-        self.log.warning("Callbacks are not implemented yet!")
-        # TODO: AIP-66 make callbacks bundle aware
-        return
-        self._callback_to_execute[request.full_filepath].append(request)
-        if request.full_filepath in self._file_path_queue:
-            # Remove file paths matching request.full_filepath from 
self._file_path_queue
+        bundle = DagBundlesManager().get_bundle(name=request.bundle_name, 
version=request.bundle_version)
+        dag_absolute_path = os.fspath(Path(bundle.path, request.filepath))
+        file_info = DagFileInfo(path=dag_absolute_path, 
bundle_name=request.bundle_name)
+        self._callback_to_execute[file_info].append(request)
+        if file_info in self._file_path_queue:
+            # Remove file paths matching request.filepath from 
self._file_path_queue
             # Since we are already going to use that filepath to run callback,
             # there is no need to have same file path again in the queue

Review Comment:
   > There is a reason now - we are running the callback on an old bundle 
version now, so we should leave the existing entry in the queue so the latest 
file is still parsed when it is its turn.
   
   I have added bundle_version to DagFileInfo, which means versions will be 
differentiated in the queue. We can keep it this way. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to