This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new d9e25c40f1 Fix failing tests in dag_processor/test_job_runner for db isolation mode (#41013) d9e25c40f1 is described below commit d9e25c40f158e6a5c516f0fd6066b945ecc52054 Author: Jarek Potiuk <ja...@potiuk.com> AuthorDate: Wed Jul 24 22:04:03 2024 +0200 Fix failing tests in dag_processor/test_job_runner for db isolation mode (#41013) This PR fixes all the tests for dag_processing for db isolation mode. Some of them required slight changes (for example session object is not used by internal-api components and asserts for it won't work). Some of them required fixes in the codebase as they found real errors. Logging at the internal-api server for request/request parameterss sent and responses have been changed to info from debug - this is useful information to show when internal api server works and we can lower the level when we are closing to releasing 2.10.0 --- airflow/api_internal/endpoints/rpc_api_endpoint.py | 3 ++- airflow/serialization/enums.py | 3 +++ airflow/serialization/serialized_objects.py | 13 +++++++++++++ tests/dag_processing/test_job_runner.py | 7 ++++--- 4 files changed, 22 insertions(+), 4 deletions(-) diff --git a/airflow/api_internal/endpoints/rpc_api_endpoint.py b/airflow/api_internal/endpoints/rpc_api_endpoint.py index c3d322f01f..4d6fc150b0 100644 --- a/airflow/api_internal/endpoints/rpc_api_endpoint.py +++ b/airflow/api_internal/endpoints/rpc_api_endpoint.py @@ -220,13 +220,14 @@ def internal_airflow_api(body: dict[str, Any]) -> APIResponse: except Exception: return log_and_build_error_response(message="Error deserializing parameters.", status=400) - log.debug("Calling method %s\nparams: %s", method_name, params) + log.info("Calling method %s\nparams: %s", method_name, params) try: # Session must be created there as it may be needed by serializer for lazy-loaded fields. with create_session() as session: output = handler(**params, session=session) output_json = BaseSerialization.serialize(output, use_pydantic_models=True) response = json.dumps(output_json) if output_json is not None else None + log.info("Sending response: %s", response) return Response(response=response, headers={"Content-Type": "application/json"}) except Exception: return log_and_build_error_response(message=f"Error executing method '{method_name}'.", status=500) diff --git a/airflow/serialization/enums.py b/airflow/serialization/enums.py index 7f8bead6ca..7d84fea373 100644 --- a/airflow/serialization/enums.py +++ b/airflow/serialization/enums.py @@ -68,3 +68,6 @@ class DagAttributeTypes(str, Enum): CONNECTION = "connection" TASK_CONTEXT = "task_context" ARG_NOT_SET = "arg_not_set" + TASK_CALLBACK_REQUEST = "task_callback_request" + DAG_CALLBACK_REQUEST = "dag_callback_request" + SLA_CALLBACK_REQUEST = "sla_callback_request" diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index bb85983d95..90797d9535 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -35,6 +35,7 @@ import lazy_object_proxy from dateutil import relativedelta from pendulum.tz.timezone import FixedTimezone, Timezone +from airflow.callbacks.callback_requests import DagCallbackRequest, SlaCallbackRequest, TaskCallbackRequest from airflow.compat.functools import cache from airflow.configuration import conf from airflow.datasets import BaseDataset, Dataset, DatasetAlias, DatasetAll, DatasetAny @@ -676,6 +677,12 @@ class BaseSerialization: ) elif isinstance(var, Connection): return cls._encode(var.to_dict(validate=True), type_=DAT.CONNECTION) + elif isinstance(var, TaskCallbackRequest): + return cls._encode(var.to_json(), type_=DAT.TASK_CALLBACK_REQUEST) + elif isinstance(var, DagCallbackRequest): + return cls._encode(var.to_json(), type_=DAT.DAG_CALLBACK_REQUEST) + elif isinstance(var, SlaCallbackRequest): + return cls._encode(var.to_json(), type_=DAT.SLA_CALLBACK_REQUEST) elif var.__class__ == Context: d = {} for k, v in var._context.items(): @@ -793,6 +800,12 @@ class BaseSerialization: return SimpleTaskInstance(**cls.deserialize(var)) elif type_ == DAT.CONNECTION: return Connection(**var) + elif type_ == DAT.TASK_CALLBACK_REQUEST: + return TaskCallbackRequest.from_json(var) + elif type_ == DAT.DAG_CALLBACK_REQUEST: + return DagCallbackRequest.from_json(var) + elif type_ == DAT.SLA_CALLBACK_REQUEST: + return SlaCallbackRequest.from_json(var) elif use_pydantic_models and _ENABLE_AIP_44: return _type_to_class[type_][0].model_validate(var) elif type_ == DAT.ARG_NOT_SET: diff --git a/tests/dag_processing/test_job_runner.py b/tests/dag_processing/test_job_runner.py index b937a29ce2..83adc23d25 100644 --- a/tests/dag_processing/test_job_runner.py +++ b/tests/dag_processing/test_job_runner.py @@ -576,13 +576,12 @@ class TestDagProcessorJobRunner: > (freezed_base_time - manager.processor.get_last_finish_time("file_1.py")).total_seconds() ) - @mock.patch("sqlalchemy.orm.session.Session.delete") @mock.patch("zipfile.is_zipfile", return_value=True) @mock.patch("airflow.utils.file.might_contain_dag", return_value=True) @mock.patch("airflow.utils.file.find_path_from_directory", return_value=True) @mock.patch("airflow.utils.file.os.path.isfile", return_value=True) def test_file_paths_in_queue_sorted_by_priority( - self, mock_isfile, mock_find_path, mock_might_contain_dag, mock_zipfile, session_delete + self, mock_isfile, mock_find_path, mock_might_contain_dag, mock_zipfile ): from airflow.models.dagbag import DagPriorityParsingRequest @@ -614,7 +613,9 @@ class TestDagProcessorJobRunner: assert manager.processor._file_path_queue == deque( ["file_1.py", "file_2.py", "file_3.py", "file_4.py"] ) - assert session_delete.call_args[0][0].fileloc == parsing_request.fileloc + with create_session() as session2: + parsing_request_after = session2.query(DagPriorityParsingRequest).get(parsing_request.id) + assert parsing_request_after is None def test_scan_stale_dags(self): """