This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new cc4ac7784de feat: Add DagRun note to OL events (#62347)
cc4ac7784de is described below

commit cc4ac7784de4608fd067e13b6fdd0addc2a90b84
Author: Kacper Muda <[email protected]>
AuthorDate: Mon Mar 2 21:01:14 2026 +0100

    feat: Add DagRun note to OL events (#62347)
---
 .../airflow/providers/openlineage/utils/utils.py   |  8 +++++++-
 .../providers/openlineage/version_compat.py        |  3 ++-
 .../tests/unit/openlineage/utils/test_utils.py     | 22 +++++++++++++++++++++-
 3 files changed, 30 insertions(+), 3 deletions(-)

diff --git 
a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py 
b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
index ed8667ffab0..bfc9f566fcb 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
@@ -60,7 +60,11 @@ from airflow.providers.openlineage.utils.selective_enable 
import (
     is_dag_lineage_enabled,
     is_task_lineage_enabled,
 )
-from airflow.providers.openlineage.version_compat import AIRFLOW_V_3_0_PLUS, 
get_base_airflow_version_tuple
+from airflow.providers.openlineage.version_compat import (
+    AIRFLOW_V_3_0_PLUS,
+    AIRFLOW_V_3_2_PLUS,
+    get_base_airflow_version_tuple,
+)
 from airflow.serialization.serialized_objects import SerializedBaseOperator, 
SerializedDAG
 
 try:
@@ -749,6 +753,7 @@ class DagRunInfo(InfoJsonEncodable):
     ]
 
     casts = {
+        "note": lambda dagrun: getattr(dagrun, "note", None) if 
AIRFLOW_V_3_2_PLUS else None,
         "duration": lambda dagrun: DagRunInfo.duration(dagrun),
         "dag_bundle_name": lambda dagrun: DagRunInfo.dag_version_info(dagrun, 
"bundle_name"),
         "dag_bundle_version": lambda dagrun: 
DagRunInfo.dag_version_info(dagrun, "bundle_version"),
@@ -837,6 +842,7 @@ class TaskInfo(InfoJsonEncodable):
         # Operator-specific useful attributes
         "trigger_dag_id",  # TriggerDagRunOperator
         "trigger_run_id",  # TriggerDagRunOperator
+        "note",  # TriggerDagRunOperator
         "external_dag_id",  # ExternalTaskSensor and ExternalTaskMarker (if 
run, as it's EmptyOperator)
         "external_task_id",  # ExternalTaskSensor and ExternalTaskMarker (if 
run, as it's EmptyOperator)
         "external_task_ids",  # ExternalTaskSensor
diff --git 
a/providers/openlineage/src/airflow/providers/openlineage/version_compat.py 
b/providers/openlineage/src/airflow/providers/openlineage/version_compat.py
index 4a2c6ca5c6c..114631640bc 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/version_compat.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/version_compat.py
@@ -33,6 +33,7 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
 
 
 AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
+AIRFLOW_V_3_2_PLUS = get_base_airflow_version_tuple() >= (3, 2, 0)
 
 
-__all__ = ["AIRFLOW_V_3_0_PLUS"]
+__all__ = ["AIRFLOW_V_3_0_PLUS", "AIRFLOW_V_3_2_PLUS"]
diff --git a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py 
b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
index 4e27cdcf486..4e76a795bb8 100644
--- a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
+++ b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
@@ -80,7 +80,11 @@ from airflow.utils.types import DagRunType
 from tests_common.test_utils.compat import BashOperator, 
OperatorSerialization, PythonOperator
 from tests_common.test_utils.mock_operators import MockOperator
 from tests_common.test_utils.taskinstance import create_task_instance
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_3_PLUS, 
AIRFLOW_V_3_0_PLUS
+from tests_common.test_utils.version_compat import (
+    AIRFLOW_V_3_0_3_PLUS,
+    AIRFLOW_V_3_0_PLUS,
+    AIRFLOW_V_3_2_PLUS,
+)
 
 BASH_OPERATOR_PATH = "airflow.providers.standard.operators.bash"
 PYTHON_OPERATOR_PATH = "airflow.providers.standard.operators.python"
@@ -185,6 +189,7 @@ def test_get_airflow_dag_run_facet():
     dagrun_mock.end_date = datetime.datetime(2024, 6, 1, 1, 2, 14, 34172, 
tzinfo=datetime.timezone.utc)
     dagrun_mock.triggering_user_name = "user1"
     dagrun_mock.triggered_by = "something"
+    dagrun_mock.note = "note"
     dagrun_mock.dag_versions = [
         MagicMock(
             bundle_name="bundle_name",
@@ -209,6 +214,9 @@ def test_get_airflow_dag_run_facet():
     }
     if hasattr(dag, "schedule_interval"):  # Airflow 2 compat.
         expected_dag_info["schedule_interval"] = "@once"
+    note: str | None = None
+    if AIRFLOW_V_3_2_PLUS:
+        note = "note"
     assert result == {
         "airflowDagRun": AirflowDagRunFacet(
             dag=expected_dag_info,
@@ -233,6 +241,7 @@ def test_get_airflow_dag_run_facet():
                 "dag_version_number": "version_number",
                 "triggering_user_name": "user1",
                 "triggered_by": "something",
+                "note": note,
             },
         )
     }
@@ -2665,6 +2674,10 @@ def test_dagrun_info_af3(mocked_dag_versions):
     dv2.bundle_name = "bundle_name"
     dv2.bundle_version = "bundle_version"
 
+    optional_args = {}
+    if AIRFLOW_V_3_2_PLUS:
+        optional_args["note"] = "note"
+
     mocked_dag_versions.return_value = [dv1, dv2]
     dagrun = DagRun(
         dag_id="dag_id",
@@ -2681,6 +2694,7 @@ def test_dagrun_info_af3(mocked_dag_versions):
         triggered_by=DagRunTriggeredByType.UI,
         backfill_id=999,
         bundle_version="bundle_version",
+        **optional_args,
     )
     assert dagrun.dag_versions == [dv1, dv2]
     dagrun.end_date = date + datetime.timedelta(seconds=74, microseconds=546)
@@ -2706,6 +2720,7 @@ def test_dagrun_info_af3(mocked_dag_versions):
         "dag_version_number": "version_number",
         "triggered_by": DagRunTriggeredByType.UI,
         "triggering_user_name": "my_user",
+        "note": optional_args.get("note"),
     }
 
 
@@ -2748,6 +2763,7 @@ def test_dagrun_info_af2():
         "dag_bundle_version": None,
         "dag_version_id": None,
         "dag_version_number": None,
+        "note": None,
     }
 
 
@@ -2852,6 +2868,7 @@ def test_task_info_af3():
             self.tol = "tol"  # SQLValueCheckOperator
             self.trigger_dag_id = "trigger_dag_id"  # TriggerDagRunOperator
             self.trigger_run_id = "trigger_run_id"  # TriggerDagRunOperator
+            self.note = "note"  # TriggerDagRunOperator
             self.hitl_summary = "hitl_summary"  # HITLOperator
             super().__init__(*args, **kwargs)
 
@@ -2899,6 +2916,7 @@ def test_task_info_af3():
         "max_active_tis_per_dagrun": None,
         "max_retry_delay": None,
         "multiple_outputs": False,
+        "note": "note",
         "operator_class": "CustomOperator",
         "operator_class_path": get_fully_qualified_class_name(task_10),
         "operator_provider_version": None,  # Custom operator doesn't have 
provider version
@@ -2979,6 +2997,7 @@ def test_task_info_af2():
             self.tol = "tol"  # SQLValueCheckOperator
             self.trigger_dag_id = "trigger_dag_id"  # TriggerDagRunOperator
             self.trigger_run_id = "trigger_run_id"  # TriggerDagRunOperator
+            self.note = "note"  # TriggerDagRunOperator
             self.hitl_summary = "hitl_summary"  # HITLOperator
             super().__init__(*args, **kwargs)
 
@@ -3063,6 +3082,7 @@ def test_task_info_af2():
         "max_threshold": "max_threshold",
         "metrics_thresholds": "metrics_thresholds",
         "min_threshold": "min_threshold",
+        "note": "note",
         "parameters": "parameters",
         "pass_value": "pass_value",
         "postoperator": "postoperator",

Reply via email to