This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d9e25c40f1 Fix failing tests in dag_processor/test_job_runner for db 
isolation mode (#41013)
d9e25c40f1 is described below

commit d9e25c40f158e6a5c516f0fd6066b945ecc52054
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Wed Jul 24 22:04:03 2024 +0200

    Fix failing tests in dag_processor/test_job_runner for db isolation mode 
(#41013)
    
    This PR fixes all the tests for dag_processing for db isolation mode.
    
    Some of them required slight changes (for example session object
    is not used by internal-api components and asserts for it won't work).
    
    Some of them required fixes in the codebase as they found real errors.
    
    Logging at the internal-api server for request/request parameterss sent
    and responses have been changed to info from debug - this is
    useful information to show when internal api server works and we can
    lower the level when we are closing to releasing 2.10.0
---
 airflow/api_internal/endpoints/rpc_api_endpoint.py |  3 ++-
 airflow/serialization/enums.py                     |  3 +++
 airflow/serialization/serialized_objects.py        | 13 +++++++++++++
 tests/dag_processing/test_job_runner.py            |  7 ++++---
 4 files changed, 22 insertions(+), 4 deletions(-)

diff --git a/airflow/api_internal/endpoints/rpc_api_endpoint.py 
b/airflow/api_internal/endpoints/rpc_api_endpoint.py
index c3d322f01f..4d6fc150b0 100644
--- a/airflow/api_internal/endpoints/rpc_api_endpoint.py
+++ b/airflow/api_internal/endpoints/rpc_api_endpoint.py
@@ -220,13 +220,14 @@ def internal_airflow_api(body: dict[str, Any]) -> 
APIResponse:
     except Exception:
         return log_and_build_error_response(message="Error deserializing 
parameters.", status=400)
 
-    log.debug("Calling method %s\nparams: %s", method_name, params)
+    log.info("Calling method %s\nparams: %s", method_name, params)
     try:
         # Session must be created there as it may be needed by serializer for 
lazy-loaded fields.
         with create_session() as session:
             output = handler(**params, session=session)
             output_json = BaseSerialization.serialize(output, 
use_pydantic_models=True)
             response = json.dumps(output_json) if output_json is not None else 
None
+            log.info("Sending response: %s", response)
             return Response(response=response, headers={"Content-Type": 
"application/json"})
     except Exception:
         return log_and_build_error_response(message=f"Error executing method 
'{method_name}'.", status=500)
diff --git a/airflow/serialization/enums.py b/airflow/serialization/enums.py
index 7f8bead6ca..7d84fea373 100644
--- a/airflow/serialization/enums.py
+++ b/airflow/serialization/enums.py
@@ -68,3 +68,6 @@ class DagAttributeTypes(str, Enum):
     CONNECTION = "connection"
     TASK_CONTEXT = "task_context"
     ARG_NOT_SET = "arg_not_set"
+    TASK_CALLBACK_REQUEST = "task_callback_request"
+    DAG_CALLBACK_REQUEST = "dag_callback_request"
+    SLA_CALLBACK_REQUEST = "sla_callback_request"
diff --git a/airflow/serialization/serialized_objects.py 
b/airflow/serialization/serialized_objects.py
index bb85983d95..90797d9535 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -35,6 +35,7 @@ import lazy_object_proxy
 from dateutil import relativedelta
 from pendulum.tz.timezone import FixedTimezone, Timezone
 
+from airflow.callbacks.callback_requests import DagCallbackRequest, 
SlaCallbackRequest, TaskCallbackRequest
 from airflow.compat.functools import cache
 from airflow.configuration import conf
 from airflow.datasets import BaseDataset, Dataset, DatasetAlias, DatasetAll, 
DatasetAny
@@ -676,6 +677,12 @@ class BaseSerialization:
             )
         elif isinstance(var, Connection):
             return cls._encode(var.to_dict(validate=True), 
type_=DAT.CONNECTION)
+        elif isinstance(var, TaskCallbackRequest):
+            return cls._encode(var.to_json(), type_=DAT.TASK_CALLBACK_REQUEST)
+        elif isinstance(var, DagCallbackRequest):
+            return cls._encode(var.to_json(), type_=DAT.DAG_CALLBACK_REQUEST)
+        elif isinstance(var, SlaCallbackRequest):
+            return cls._encode(var.to_json(), type_=DAT.SLA_CALLBACK_REQUEST)
         elif var.__class__ == Context:
             d = {}
             for k, v in var._context.items():
@@ -793,6 +800,12 @@ class BaseSerialization:
             return SimpleTaskInstance(**cls.deserialize(var))
         elif type_ == DAT.CONNECTION:
             return Connection(**var)
+        elif type_ == DAT.TASK_CALLBACK_REQUEST:
+            return TaskCallbackRequest.from_json(var)
+        elif type_ == DAT.DAG_CALLBACK_REQUEST:
+            return DagCallbackRequest.from_json(var)
+        elif type_ == DAT.SLA_CALLBACK_REQUEST:
+            return SlaCallbackRequest.from_json(var)
         elif use_pydantic_models and _ENABLE_AIP_44:
             return _type_to_class[type_][0].model_validate(var)
         elif type_ == DAT.ARG_NOT_SET:
diff --git a/tests/dag_processing/test_job_runner.py 
b/tests/dag_processing/test_job_runner.py
index b937a29ce2..83adc23d25 100644
--- a/tests/dag_processing/test_job_runner.py
+++ b/tests/dag_processing/test_job_runner.py
@@ -576,13 +576,12 @@ class TestDagProcessorJobRunner:
                 > (freezed_base_time - 
manager.processor.get_last_finish_time("file_1.py")).total_seconds()
             )
 
-    @mock.patch("sqlalchemy.orm.session.Session.delete")
     @mock.patch("zipfile.is_zipfile", return_value=True)
     @mock.patch("airflow.utils.file.might_contain_dag", return_value=True)
     @mock.patch("airflow.utils.file.find_path_from_directory", 
return_value=True)
     @mock.patch("airflow.utils.file.os.path.isfile", return_value=True)
     def test_file_paths_in_queue_sorted_by_priority(
-        self, mock_isfile, mock_find_path, mock_might_contain_dag, 
mock_zipfile, session_delete
+        self, mock_isfile, mock_find_path, mock_might_contain_dag, mock_zipfile
     ):
         from airflow.models.dagbag import DagPriorityParsingRequest
 
@@ -614,7 +613,9 @@ class TestDagProcessorJobRunner:
         assert manager.processor._file_path_queue == deque(
             ["file_1.py", "file_2.py", "file_3.py", "file_4.py"]
         )
-        assert session_delete.call_args[0][0].fileloc == 
parsing_request.fileloc
+        with create_session() as session2:
+            parsing_request_after = 
session2.query(DagPriorityParsingRequest).get(parsing_request.id)
+        assert parsing_request_after is None
 
     def test_scan_stale_dags(self):
         """

Reply via email to