ephraimbuddy commented on code in PR #45860:
URL: https://github.com/apache/airflow/pull/45860#discussion_r1926204600
##########
airflow/dag_processing/manager.py:
##########
@@ -388,18 +391,18 @@ def _fetch_callbacks(
def _add_callback_to_queue(self, request: CallbackRequest):
self.log.debug("Queuing %s CallbackRequest: %s",
type(request).__name__, request)
- self.log.warning("Callbacks are not implemented yet!")
- # TODO: AIP-66 make callbacks bundle aware
- return
- self._callback_to_execute[request.full_filepath].append(request)
- if request.full_filepath in self._file_path_queue:
- # Remove file paths matching request.full_filepath from
self._file_path_queue
+ bundle = DagBundlesManager().get_bundle(name=request.bundle_name,
version=request.bundle_version)
+ dag_absolute_path = os.fspath(Path(bundle.path, request.filepath))
+ file_info = DagFileInfo(path=dag_absolute_path,
bundle_name=request.bundle_name)
+ self._callback_to_execute[file_info].append(request)
+ if file_info in self._file_path_queue:
+ # Remove file paths matching request.filepath from
self._file_path_queue
# Since we are already going to use that filepath to run callback,
# there is no need to have same file path again in the queue
Review Comment:
> There is a reason now - we are running the callback on an old bundle
version now, so we should leave the existing entry in the queue so the latest
file is still parsed when it is its turn.
I have added bundle_version to DagFileInfo, which means versions will be
differentiated in the queue. We can keep it this way. WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]