jedcunningham commented on code in PR #45860:
URL: https://github.com/apache/airflow/pull/45860#discussion_r1933140168
##########
airflow/callbacks/callback_requests.py:
##########
@@ -34,8 +34,10 @@ class BaseCallbackRequest(BaseModel):
:param msg: Additional Message that can be used for logging
"""
- full_filepath: str
+ filepath: str
"""File Path to use to run the callback"""
+ bundle_name: str
+ bundle_version: str | None = None
Review Comment:
hmm, not sure this should have a default value of None.
##########
tests/dag_processing/test_manager.py:
##########
@@ -160,33 +160,35 @@ def test_max_runs_when_no_files(self, tmp_path):
# TODO: AIP-66 no asserts?
- def test_start_new_processes_with_same_filepath(self):
+ def test_start_new_processes_with_same_filepath(self,
configure_testing_dag_bundle):
"""
Test that when a processor already exist with a filepath, a new
processor won't be created
with that filepath. The filepath will just be removed from the list.
"""
- manager = DagFileProcessorManager(max_runs=1)
+ with configure_testing_dag_bundle("/tmp"):
+ manager = DagFileProcessorManager(max_runs=1)
+ manager._dag_bundles =
list(DagBundlesManager().get_all_dag_bundles())
- file_1 = DagFileInfo(bundle_name="testing", path="file_1.py")
- file_2 = DagFileInfo(bundle_name="testing", path="file_2.py")
- file_3 = DagFileInfo(bundle_name="testing", path="file_3.py")
- manager._file_path_queue = deque([file_1, file_2, file_3])
+ file_1 = DagFileInfo(bundle_name="testing", path="/tmp/file_1.py")
+ file_2 = DagFileInfo(bundle_name="testing", path="/tmp/file_2.py")
+ file_3 = DagFileInfo(bundle_name="testing", path="/tmp/file_3.py")
+ manager._file_path_queue = deque([file_1, file_2, file_3])
# Mock that only one processor exists. This processor runs with
'file_1'
manager._processors[file_1] = MagicMock()
# Start New Processes
with mock.patch.object(DagFileProcessorManager, "_create_process"):
manager._start_new_processes()
- # Because of the config: '[scheduler] parsing_processes = 2'
- # verify that only one extra process is created
- # and since a processor with 'file_1' already exists,
- # even though it is first in '_file_path_queue'
- # a new processor is created with 'file_2' and not 'file_1'.
+ # Because of the config: '[scheduler] parsing_processes = 2'
+ # verify that only one extra process is created
+ # and since a processor with 'file_1' already exists,
+ # even though it is first in '_file_path_queue'
+ # a new processor is created with 'file_2' and not 'file_1'.
- assert file_1 in manager._processors.keys()
- assert file_2 in manager._processors.keys()
- assert deque([file_3]) == manager._file_path_queue
+ assert file_1 in manager._processors.keys()
Review Comment:
same here.
##########
airflow/triggers/base.py:
##########
@@ -215,9 +215,10 @@ def _submit_callback_if_necessary(self, *, task_instance:
TaskInstance, session)
"""Submit a callback request if the task state is SUCCESS or FAILED."""
if self.task_instance_state in (TaskInstanceState.SUCCESS,
TaskInstanceState.FAILED):
request = TaskCallbackRequest(
- full_filepath=task_instance.dag_model.fileloc,
+ filepath=task_instance.dag_model.fileloc,
ti=task_instance,
task_callback_type=self.task_instance_state,
+ bundle_name=task_instance.dag_model.bundle_name,
Review Comment:
No version for this one?
##########
tests/dag_processing/test_manager.py:
##########
@@ -160,33 +160,35 @@ def test_max_runs_when_no_files(self, tmp_path):
# TODO: AIP-66 no asserts?
- def test_start_new_processes_with_same_filepath(self):
+ def test_start_new_processes_with_same_filepath(self,
configure_testing_dag_bundle):
"""
Test that when a processor already exist with a filepath, a new
processor won't be created
with that filepath. The filepath will just be removed from the list.
"""
- manager = DagFileProcessorManager(max_runs=1)
+ with configure_testing_dag_bundle("/tmp"):
+ manager = DagFileProcessorManager(max_runs=1)
+ manager._dag_bundles =
list(DagBundlesManager().get_all_dag_bundles())
- file_1 = DagFileInfo(bundle_name="testing", path="file_1.py")
- file_2 = DagFileInfo(bundle_name="testing", path="file_2.py")
- file_3 = DagFileInfo(bundle_name="testing", path="file_3.py")
- manager._file_path_queue = deque([file_1, file_2, file_3])
+ file_1 = DagFileInfo(bundle_name="testing", path="/tmp/file_1.py")
+ file_2 = DagFileInfo(bundle_name="testing", path="/tmp/file_2.py")
+ file_3 = DagFileInfo(bundle_name="testing", path="/tmp/file_3.py")
+ manager._file_path_queue = deque([file_1, file_2, file_3])
# Mock that only one processor exists. This processor runs with
'file_1'
manager._processors[file_1] = MagicMock()
# Start New Processes
with mock.patch.object(DagFileProcessorManager, "_create_process"):
manager._start_new_processes()
- # Because of the config: '[scheduler] parsing_processes = 2'
- # verify that only one extra process is created
- # and since a processor with 'file_1' already exists,
- # even though it is first in '_file_path_queue'
- # a new processor is created with 'file_2' and not 'file_1'.
+ # Because of the config: '[scheduler] parsing_processes = 2'
Review Comment:
This doesn't need to happen in the patch context manager, right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]