jedcunningham commented on code in PR #45860:
URL: https://github.com/apache/airflow/pull/45860#discussion_r1924490231


##########
airflow/dag_processing/manager.py:
##########
@@ -388,18 +391,18 @@ def _fetch_callbacks(
 
     def _add_callback_to_queue(self, request: CallbackRequest):
         self.log.debug("Queuing %s CallbackRequest: %s", 
type(request).__name__, request)
-        self.log.warning("Callbacks are not implemented yet!")
-        # TODO: AIP-66 make callbacks bundle aware
-        return
-        self._callback_to_execute[request.full_filepath].append(request)
-        if request.full_filepath in self._file_path_queue:
-            # Remove file paths matching request.full_filepath from 
self._file_path_queue
+        bundle = DagBundlesManager().get_bundle(name=request.bundle_name, 
version=request.bundle_version)
+        dag_absolute_path = os.fspath(Path(bundle.path, request.filepath))
+        file_info = DagFileInfo(path=dag_absolute_path, 
bundle_name=request.bundle_name)
+        self._callback_to_execute[file_info].append(request)
+        if file_info in self._file_path_queue:
+            # Remove file paths matching request.filepath from 
self._file_path_queue
             # Since we are already going to use that filepath to run callback,
             # there is no need to have same file path again in the queue

Review Comment:
   There is a reason now - we are running the callback on an old bundle version 
now, so we should leave the existing entry in the queue so the latest file is 
still parsed when it is its turn.



##########
airflow/models/dag.py:
##########
@@ -2212,15 +2213,14 @@ def safe_dag_id(self):
         return self.dag_id.replace(".", "__dot__")
 
     @property
-    def relative_fileloc(self) -> pathlib.Path | None:
+    def relative_fileloc(self) -> pathlib.Path:
         """File location of the importable dag 'file' relative to the 
configured DAGs folder."""

Review Comment:
   docstring needs updating. Longer term `fileloc` should just be relative from 
the start: #45623 



##########
airflow/dag_processing/processor.py:
##########
@@ -66,17 +66,22 @@ def _parse_file_entrypoint():
     log = structlog.get_logger(logger_name="task")
 
     result = _parse_file(msg, log)
-    comms_decoder.send_request(log, result)
+    if result:
+        comms_decoder.send_request(log, result)
 
 
-def _parse_file(msg: DagFileParseRequest, log: FilteringBoundLogger) -> 
DagFileParsingResult:
+def _parse_file(msg: DagFileParseRequest, log: FilteringBoundLogger) -> 
DagFileParsingResult | None:
     # TODO: Set known_pool names on DagBag!
     bag = DagBag(
         dag_folder=msg.file,
         include_examples=False,
         safe_mode=True,
         load_op_links=False,
     )
+    if msg.callback_requests:

Review Comment:
   Probably worth leaving a comment why we are short circuiting out.



##########
airflow/dag_processing/manager.py:
##########
@@ -647,10 +650,9 @@ def set_file_paths(self, new_file_paths: 
list[DagFileInfo]):
         self._file_path_queue = deque(x for x in self._file_path_queue if x in 
new_file_paths)
         Stats.gauge("dag_processing.file_path_queue_size", 
len(self._file_path_queue))
 
-        # TODO: AIP-66 make callbacks bundle aware
-        # callback_paths_to_del = [x for x in self._callback_to_execute if x 
not in new_file_paths]
-        # for path_to_del in callback_paths_to_del:
-        #     del self._callback_to_execute[path_to_del]
+        callback_paths_to_del = [x for x in self._callback_to_execute if x not 
in new_file_paths]

Review Comment:
   We will need to be a little smarter about this - the file could still exist 
in the old bundle version, but not the latest.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to