This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-10-test by this push:
new a871ab35dce allow to set note field via internal api (#47769)
a871ab35dce is described below
commit a871ab35dcea6f38b62d84b1187d7d6957f889d8
Author: AutomationDev85 <[email protected]>
AuthorDate: Fri Mar 21 21:33:55 2025 +0100
allow to set note field via internal api (#47769)
Co-authored-by: AutomationDev85 <AutomationDev85>
---
airflow/models/taskinstance.py | 4 ++
airflow/serialization/pydantic/taskinstance.py | 1 +
tests/models/test_taskinstance.py | 65 ++++++++++++++++++++++++++
3 files changed, 70 insertions(+)
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 805194ad00e..9aebd40f6c9 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -801,6 +801,8 @@ def _execute_task(task_instance: TaskInstance |
TaskInstancePydantic, context: C
def _set_ti_attrs(target, source, include_dag_run=False):
+ from airflow.serialization.pydantic.taskinstance import
TaskInstancePydantic
+
# Fields ordered per model definition
target.start_date = source.start_date
target.end_date = source.end_date
@@ -826,6 +828,8 @@ def _set_ti_attrs(target, source, include_dag_run=False):
target.trigger_id = source.trigger_id
target.next_method = source.next_method
target.next_kwargs = source.next_kwargs
+ if source.note and isinstance(source, TaskInstancePydantic):
+ target.note = source.note
if include_dag_run:
target.execution_date = source.execution_date
diff --git a/airflow/serialization/pydantic/taskinstance.py
b/airflow/serialization/pydantic/taskinstance.py
index e89d21cd98d..5958c9d5397 100644
--- a/airflow/serialization/pydantic/taskinstance.py
+++ b/airflow/serialization/pydantic/taskinstance.py
@@ -123,6 +123,7 @@ class TaskInstancePydantic(BaseModelPydantic, LoggingMixin):
dag_model: Optional[DagModelPydantic]
raw: Optional[bool]
is_trigger_log_context: Optional[bool]
+ note: Optional[str] = None
model_config = ConfigDict(from_attributes=True,
arbitrary_types_allowed=True)
@property
diff --git a/tests/models/test_taskinstance.py
b/tests/models/test_taskinstance.py
index bb877704339..f7a2f3b4144 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -84,6 +84,7 @@ from airflow.operators.empty import EmptyOperator
from airflow.operators.python import BranchPythonOperator, PythonOperator
from airflow.sensors.base import BaseSensorOperator
from airflow.sensors.python import PythonSensor
+from airflow.serialization.pydantic.taskinstance import TaskInstancePydantic
from airflow.serialization.serialized_objects import SerializedBaseOperator,
SerializedDAG
from airflow.settings import TIMEZONE, TracebackSessionForTests,
reconfigure_orm
from airflow.stats import Stats
@@ -5421,6 +5422,70 @@ def test_taskinstance_with_note(create_task_instance,
session):
assert
session.query(TaskInstanceNote).filter_by(**filter_kwargs).one_or_none() is None
+def test_taskinstance_with_note_pydantic(create_task_instance, session):
+ ti = create_task_instance(
+ dag_id="dag_for_testing_with_note_pydantic",
+ task_id="task_for_testing_with_note_pydantic",
+ run_type=DagRunType.SCHEDULED,
+ execution_date=DEFAULT_DATE,
+ )
+
+ ti_pydantic = TaskInstancePydantic(
+ task_id=ti.task_id,
+ dag_id=ti.dag_id,
+ run_id=ti.run_id,
+ map_index=ti.map_index,
+ start_date=ti.start_date,
+ end_date=ti.end_date,
+ execution_date=ti.execution_date,
+ duration=0.1,
+ state="success",
+ try_number=ti.try_number,
+ max_tries=ti.max_tries,
+ hostname="host",
+ unixname="unix",
+ job_id=ti.job_id,
+ pool=ti.pool,
+ pool_slots=ti.pool_slots,
+ queue=ti.queue,
+ priority_weight=ti.priority_weight,
+ operator=ti.operator,
+ custom_operator_name=ti.custom_operator_name,
+ queued_dttm=timezone.utcnow(),
+ queued_by_job_id=3,
+ pid=12345,
+ executor=ti.executor,
+ executor_config=None,
+ updated_at=timezone.utcnow(),
+ rendered_map_index=ti.rendered_map_index,
+ external_executor_id="x",
+ trigger_id=ti.trigger_id,
+ trigger_timeout=timezone.utcnow(),
+ next_method="bla",
+ next_kwargs=None,
+ run_as_user=None,
+ task=ti.task,
+ test_mode=False,
+ dag_run=ti.dag_run,
+ dag_model=ti.dag_model,
+ raw=False,
+ is_trigger_log_context=False,
+ note="ti with note",
+ )
+
+ TaskInstance.save_to_db(ti_pydantic, session)
+
+ filter_kwargs = dict(
+ dag_id=ti_pydantic.dag_id,
+ task_id=ti_pydantic.task_id,
+ run_id=ti_pydantic.run_id,
+ map_index=ti_pydantic.map_index,
+ )
+
+ ti_note: TaskInstanceNote =
session.query(TaskInstanceNote).filter_by(**filter_kwargs).one()
+ assert ti_note.content == "ti with note"
+
+
def test__refresh_from_db_should_not_increment_try_number(dag_maker, session):
with dag_maker():
BashOperator(task_id="hello", bash_command="hi")