This is an automated email from the ASF dual-hosted git repository.

vatsrahul1001 pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 5357677bf4bdd97da47b5a1d70207952994917df
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri May 29 20:07:11 2026 +0530

    [v3-2-test] Cache BaseOperator signature in OperatorSerialization (#67701) 
(#67708)
    
    * [v3-2-test] Cache BaseOperator signature in OperatorSerialization (#67701)
    (cherry picked from commit 5799f9d672b80d01556b70eace88238d7b61665c)
    
    Co-authored-by: Shahar Epstein <[email protected]>
    
    * Fix serialization unit tests to avoid ORM side effects in --skip-db mode 
(#65206)
    
    (cherry picked from commit 2007bff00b14fd82dff08ce78649681e9a867d1b)
    
    ---------
    
    Co-authored-by: Shahar Epstein <[email protected]>
    Co-authored-by: Pavel Grakhov <[email protected]>
---
 .../airflow/serialization/serialized_objects.py    | 10 ++--
 .../unit/serialization/test_serialized_objects.py  | 54 +++++++++-------------
 2 files changed, 28 insertions(+), 36 deletions(-)

diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py 
b/airflow-core/src/airflow/serialization/serialized_objects.py
index a0295dc2370..351a0b6cf42 100644
--- a/airflow-core/src/airflow/serialization/serialized_objects.py
+++ b/airflow-core/src/airflow/serialization/serialized_objects.py
@@ -966,6 +966,12 @@ class OperatorSerialization(DAGNode, BaseSerialization):
 
     _const_fields: ClassVar[set[str] | None] = None
 
+    # Parameters of BaseOperator.__init__ that must not appear in 
template_fields.
+    # Computed once at class-load time: the signature never changes during a 
process.
+    _FORBIDDEN_TEMPLATE_FIELDS: ClassVar[frozenset[str]] = frozenset(
+        signature(BaseOperator.__init__).parameters
+    ) - {"email"}
+
     @classmethod
     def serialize_mapped_operator(cls, op: MappedOperator) -> dict[str, Any]:
         serialized_op = cls._serialize_node(op)
@@ -1046,9 +1052,7 @@ class OperatorSerialization(DAGNode, BaseSerialization):
         # Store all template_fields as they are if there are JSON Serializable
         # If not, store them as strings
         # And raise an exception if the field is not templateable
-        forbidden_fields = 
set(signature(BaseOperator.__init__).parameters.keys())
-        # Though allow some of the BaseOperator fields to be templated anyway
-        forbidden_fields.difference_update({"email"})
+        forbidden_fields = cls._FORBIDDEN_TEMPLATE_FIELDS
         if op.template_fields:
             for template_field in op.template_fields:
                 if template_field in forbidden_fields:
diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py 
b/airflow-core/tests/unit/serialization/test_serialized_objects.py
index c117561273b..06754923137 100644
--- a/airflow-core/tests/unit/serialization/test_serialized_objects.py
+++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py
@@ -32,6 +32,7 @@ from pendulum.tz.timezone import FixedTimezone, Timezone
 from uuid6 import uuid7
 
 from airflow._shared.timezones import timezone
+from airflow.api_fastapi.execution_api.datamodels import taskinstance as 
ti_datamodel
 from airflow.callbacks.callback_requests import DagCallbackRequest, 
TaskCallbackRequest
 from airflow.exceptions import (
     AirflowException,
@@ -42,8 +43,6 @@ from airflow.exceptions import (
 )
 from airflow.models.connection import Connection
 from airflow.models.dag import DAG
-from airflow.models.dagrun import DagRun
-from airflow.models.taskinstance import TaskInstance
 from airflow.models.xcom_arg import XComArg
 from airflow.partition_mappers.identity import IdentityMapper as 
CoreIdentityMapper
 from airflow.partition_mappers.temporal import (
@@ -104,12 +103,9 @@ from airflow.serialization.serialized_objects import (
     DagSerialization,
     LazyDeserializedDAG,
     _has_kubernetes,
-    create_scheduler_operator,
 )
 from airflow.triggers.base import BaseTrigger
 from airflow.utils.db import LazySelectSequence
-from airflow.utils.state import DagRunState, State
-from airflow.utils.types import DagRunType
 
 from unit.models import DEFAULT_DATE
 
@@ -225,32 +221,15 @@ def test_serde_validate_schema_valid_json():
     assert t.obj == {"foo": "bar"}
 
 
-TI = TaskInstance(
-    task=create_scheduler_operator(EmptyOperator(task_id="test-task")),
+TASK_CALLBACK_TI = ti_datamodel.TaskInstance(
+    id=uuid7(),
+    task_id="test-task",
+    dag_id="test-dag",
     run_id="fake_run",
-    state=State.RUNNING,
+    try_number=1,
     dag_version_id=uuid7(),
 )
 
-TI_WITH_START_DAY = TaskInstance(
-    task=create_scheduler_operator(EmptyOperator(task_id="test-task")),
-    run_id="fake_run",
-    state=State.RUNNING,
-    dag_version_id=uuid7(),
-)
-TI_WITH_START_DAY.start_date = timezone.datetime(2020, 1, 1, 0, 0, 0)
-
-DAG_RUN = DagRun(
-    dag_id="test_dag_id",
-    run_id="test_dag_run_id",
-    run_type=DagRunType.MANUAL,
-    logical_date=timezone.utcnow(),
-    start_date=timezone.utcnow(),
-    state=DagRunState.SUCCESS,
-)
-DAG_RUN.id = 1
-
-
 # we add the tasks out of order, to ensure they are deserialized in the 
correct order
 DAG_WITH_TASKS = DAG(dag_id="test_dag", start_date=datetime.now())
 EmptyOperator(task_id="task2", dag=DAG_WITH_TASKS)
@@ -400,15 +379,10 @@ class MockLazySelectSequence(LazySelectSequence):
             DAT.ASSET,
             equal_serialized_asset,
         ),
-        (
-            Connection(conn_id="TEST_ID", uri="mysql://"),
-            DAT.CONNECTION,
-            lambda a, b: a.get_uri() == b.get_uri(),
-        ),
         (
             TaskCallbackRequest(
                 filepath="filepath",
-                ti=TI,
+                ti=TASK_CALLBACK_TI,
                 bundle_name="testing",
                 bundle_version=None,
             ),
@@ -504,6 +478,19 @@ def test_serialize_deserialize(input, encoded_type, 
cmp_func):
     json.dumps(serialized)  # does not raise
 
 
[email protected]_test
+def test_serialize_deserialize_connection():
+    from airflow.serialization.serialized_objects import BaseSerialization
+
+    connection = Connection(conn_id="TEST_ID", uri="mysql://")
+    serialized = BaseSerialization.serialize(connection)
+    json.dumps(serialized)
+    assert serialized[Encoding.TYPE] == DAT.CONNECTION
+
+    deserialized = BaseSerialization.deserialize(serialized)
+    assert deserialized.get_uri() == connection.get_uri()
+
+
 @pytest.mark.parametrize("reference", REFERENCE_TYPES)
 def test_serialize_deserialize_deadline_alert(reference):
     public_deadline_alert_fields = {
@@ -541,6 +528,7 @@ def test_serialize_deserialize_deadline_alert(reference):
         ),
     ],
 )
[email protected]_test
 def test_backcompat_deserialize_connection(conn_uri):
     """Test deserialize connection which serialised by previous serializer 
implementation."""
     from airflow.serialization.serialized_objects import BaseSerialization

Reply via email to