This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v2-10-test by this push:
     new a871ab35dce allow to set note field via internal api (#47769)
a871ab35dce is described below

commit a871ab35dcea6f38b62d84b1187d7d6957f889d8
Author: AutomationDev85 <[email protected]>
AuthorDate: Fri Mar 21 21:33:55 2025 +0100

    allow to set note field via internal api (#47769)
    
    Co-authored-by: AutomationDev85 <AutomationDev85>
---
 airflow/models/taskinstance.py                 |  4 ++
 airflow/serialization/pydantic/taskinstance.py |  1 +
 tests/models/test_taskinstance.py              | 65 ++++++++++++++++++++++++++
 3 files changed, 70 insertions(+)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 805194ad00e..9aebd40f6c9 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -801,6 +801,8 @@ def _execute_task(task_instance: TaskInstance | 
TaskInstancePydantic, context: C
 
 
 def _set_ti_attrs(target, source, include_dag_run=False):
+    from airflow.serialization.pydantic.taskinstance import 
TaskInstancePydantic
+
     # Fields ordered per model definition
     target.start_date = source.start_date
     target.end_date = source.end_date
@@ -826,6 +828,8 @@ def _set_ti_attrs(target, source, include_dag_run=False):
     target.trigger_id = source.trigger_id
     target.next_method = source.next_method
     target.next_kwargs = source.next_kwargs
+    if source.note and isinstance(source, TaskInstancePydantic):
+        target.note = source.note
 
     if include_dag_run:
         target.execution_date = source.execution_date
diff --git a/airflow/serialization/pydantic/taskinstance.py 
b/airflow/serialization/pydantic/taskinstance.py
index e89d21cd98d..5958c9d5397 100644
--- a/airflow/serialization/pydantic/taskinstance.py
+++ b/airflow/serialization/pydantic/taskinstance.py
@@ -123,6 +123,7 @@ class TaskInstancePydantic(BaseModelPydantic, LoggingMixin):
     dag_model: Optional[DagModelPydantic]
     raw: Optional[bool]
     is_trigger_log_context: Optional[bool]
+    note: Optional[str] = None
     model_config = ConfigDict(from_attributes=True, 
arbitrary_types_allowed=True)
 
     @property
diff --git a/tests/models/test_taskinstance.py 
b/tests/models/test_taskinstance.py
index bb877704339..f7a2f3b4144 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -84,6 +84,7 @@ from airflow.operators.empty import EmptyOperator
 from airflow.operators.python import BranchPythonOperator, PythonOperator
 from airflow.sensors.base import BaseSensorOperator
 from airflow.sensors.python import PythonSensor
+from airflow.serialization.pydantic.taskinstance import TaskInstancePydantic
 from airflow.serialization.serialized_objects import SerializedBaseOperator, 
SerializedDAG
 from airflow.settings import TIMEZONE, TracebackSessionForTests, 
reconfigure_orm
 from airflow.stats import Stats
@@ -5421,6 +5422,70 @@ def test_taskinstance_with_note(create_task_instance, 
session):
     assert 
session.query(TaskInstanceNote).filter_by(**filter_kwargs).one_or_none() is None
 
 
+def test_taskinstance_with_note_pydantic(create_task_instance, session):
+    ti = create_task_instance(
+        dag_id="dag_for_testing_with_note_pydantic",
+        task_id="task_for_testing_with_note_pydantic",
+        run_type=DagRunType.SCHEDULED,
+        execution_date=DEFAULT_DATE,
+    )
+
+    ti_pydantic = TaskInstancePydantic(
+        task_id=ti.task_id,
+        dag_id=ti.dag_id,
+        run_id=ti.run_id,
+        map_index=ti.map_index,
+        start_date=ti.start_date,
+        end_date=ti.end_date,
+        execution_date=ti.execution_date,
+        duration=0.1,
+        state="success",
+        try_number=ti.try_number,
+        max_tries=ti.max_tries,
+        hostname="host",
+        unixname="unix",
+        job_id=ti.job_id,
+        pool=ti.pool,
+        pool_slots=ti.pool_slots,
+        queue=ti.queue,
+        priority_weight=ti.priority_weight,
+        operator=ti.operator,
+        custom_operator_name=ti.custom_operator_name,
+        queued_dttm=timezone.utcnow(),
+        queued_by_job_id=3,
+        pid=12345,
+        executor=ti.executor,
+        executor_config=None,
+        updated_at=timezone.utcnow(),
+        rendered_map_index=ti.rendered_map_index,
+        external_executor_id="x",
+        trigger_id=ti.trigger_id,
+        trigger_timeout=timezone.utcnow(),
+        next_method="bla",
+        next_kwargs=None,
+        run_as_user=None,
+        task=ti.task,
+        test_mode=False,
+        dag_run=ti.dag_run,
+        dag_model=ti.dag_model,
+        raw=False,
+        is_trigger_log_context=False,
+        note="ti with note",
+    )
+
+    TaskInstance.save_to_db(ti_pydantic, session)
+
+    filter_kwargs = dict(
+        dag_id=ti_pydantic.dag_id,
+        task_id=ti_pydantic.task_id,
+        run_id=ti_pydantic.run_id,
+        map_index=ti_pydantic.map_index,
+    )
+
+    ti_note: TaskInstanceNote = 
session.query(TaskInstanceNote).filter_by(**filter_kwargs).one()
+    assert ti_note.content == "ti with note"
+
+
 def test__refresh_from_db_should_not_increment_try_number(dag_maker, session):
     with dag_maker():
         BashOperator(task_id="hello", bash_command="hi")

Reply via email to