This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 810ad03918f fix: ol root macros should reflect root from dagrun conf 
parent (#58428)
810ad03918f is described below

commit 810ad03918fcf5fc6bc8bdd307666f12adc00e2e
Author: Kacper Muda <[email protected]>
AuthorDate: Tue Nov 18 15:23:20 2025 +0100

    fix: ol root macros should reflect root from dagrun conf parent (#58428)
---
 .../providers/openlineage/plugins/macros.py        |  12 +-
 .../tests/unit/openlineage/plugins/test_macros.py  | 306 ++++++++++++---------
 2 files changed, 181 insertions(+), 137 deletions(-)

diff --git 
a/providers/openlineage/src/airflow/providers/openlineage/plugins/macros.py 
b/providers/openlineage/src/airflow/providers/openlineage/plugins/macros.py
index 508c89eafda..58d1a5b452a 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/plugins/macros.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/plugins/macros.py
@@ -20,7 +20,11 @@ from typing import TYPE_CHECKING
 
 from airflow.providers.openlineage import conf
 from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter
-from airflow.providers.openlineage.utils.utils import get_job_name, 
get_root_information_from_dagrun_conf
+from airflow.providers.openlineage.utils.utils import (
+    get_job_name,
+    get_parent_information_from_dagrun_conf,
+    get_root_information_from_dagrun_conf,
+)
 from airflow.providers.openlineage.version_compat import AIRFLOW_V_3_0_PLUS
 
 if TYPE_CHECKING:
@@ -136,7 +140,13 @@ def lineage_root_job_namespace(task_instance: 
TaskInstance):
 
 def _get_ol_root_id(id_key: str, task_instance: TaskInstance) -> str | None:
     dr_conf = _get_dag_run_conf(task_instance=task_instance)
+    # Check DagRun conf for root info
     ol_root_info = get_root_information_from_dagrun_conf(dr_conf=dr_conf)
+    if ol_root_info and ol_root_info.get(id_key):
+        return ol_root_info[id_key]
+    # Then check DagRun conf for parent into that is used as root in case 
explicit root is missing
+    id_key = id_key.replace("root_", "")
+    ol_root_info = get_parent_information_from_dagrun_conf(dr_conf=dr_conf)
     if ol_root_info and ol_root_info.get(id_key):
         return ol_root_info[id_key]
     return None
diff --git 
a/providers/openlineage/tests/unit/openlineage/plugins/test_macros.py 
b/providers/openlineage/tests/unit/openlineage/plugins/test_macros.py
index 3cdde5d183f..5a0dcb6e348 100644
--- a/providers/openlineage/tests/unit/openlineage/plugins/test_macros.py
+++ b/providers/openlineage/tests/unit/openlineage/plugins/test_macros.py
@@ -133,8 +133,30 @@ def 
test_lineage_root_run_id_with_runtime_task_instance(create_runtime_ti):
         pytest.fail(f"lineage_root_run_id should not throw AttributeError with 
RuntimeTaskInstance: {e}")
 
 
[email protected](
+    "conf",
+    (
+        {
+            "openlineage": {
+                "rootParentRunId": "22222222-2222-2222-2222-222222222222",
+                "rootParentJobNamespace": "rootns",
+                "rootParentJobName": "rootjob",
+            }
+        },
+        {
+            "openlineage": {
+                "rootParentRunId": "22222222-2222-2222-2222-222222222222",
+                "rootParentJobNamespace": "rootns",
+                "rootParentJobName": "rootjob",
+                "parentRunId": "33333333-3333-3333-3333-333333333333",
+                "parentJobNamespace": "parentns",
+                "parentJobName": "parentjob",
+            }
+        },
+    ),
+)
 @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Test only for Airflow 
3.0+")
-def test_lineage_root_run_id_no_conf_af3(create_runtime_ti):
+def test_lineage_root_macros_use_root_from_conf_af3(create_runtime_ti, conf):
     from airflow.providers.common.compat.sdk import BaseOperator
 
     task = BaseOperator(task_id="test_task")
@@ -143,15 +165,19 @@ def 
test_lineage_root_run_id_no_conf_af3(create_runtime_ti):
         task=task,
         dag_id="test_dag",
         run_id="test_run_id",
-        conf=None,
+        conf=conf,
     )
 
-    result = lineage_root_run_id(runtime_ti)
-    assert result == "01937fbb-4680-70b3-b49b-1de6b041527a"
+    root_run_id = lineage_root_run_id(runtime_ti)
+    root_job_name = lineage_root_job_name(runtime_ti)
+    root_job_namespace = lineage_root_job_namespace(runtime_ti)
+    assert root_run_id == "22222222-2222-2222-2222-222222222222"
+    assert root_job_name == "rootjob"
+    assert root_job_namespace == "rootns"
 
 
 @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Test only for Airflow 
3.0+")
-def test_lineage_root_run_id_with_conf_af3(create_runtime_ti):
+def 
test_lineage_root_macros_use_parent_from_conf_when_root_missing_af3(create_runtime_ti):
     from airflow.providers.common.compat.sdk import BaseOperator
 
     task = BaseOperator(task_id="test_task")
@@ -162,60 +188,57 @@ def 
test_lineage_root_run_id_with_conf_af3(create_runtime_ti):
         run_id="test_run_id",
         conf={
             "openlineage": {
-                "rootParentRunId": "22222222-2222-2222-2222-222222222222",
-                "rootParentJobNamespace": "rootns",
-                "rootParentJobName": "rootjob",
+                "parentRunId": "33333333-3333-3333-3333-333333333333",
+                "parentJobNamespace": "parentns",
+                "parentJobName": "parentjob",
             }
         },
     )
 
-    result = lineage_root_run_id(runtime_ti)
-    assert result == "22222222-2222-2222-2222-222222222222"
-
-
-def test_lineage_root_run_id_without_conf_af2():
-    date = datetime(2020, 1, 1, 1, 1, 1, 0, tzinfo=timezone.utc)
-    conf = {}
-    dag_run = mock.MagicMock(run_id="run_id", conf=conf)
-    dag_run.logical_date = date
-    dag_run.clear_number = 1
-    task_instance = mock.MagicMock(
-        dag_id="dag_id",
-        task_id="task_id",
-        dag_run=dag_run,
-        logical_date=date,
-        try_number=1,
-    )
-
-    call_result1 = lineage_root_run_id(task_instance)
-    call_result2 = lineage_root_run_id(task_instance)
-
-    # random part value does not matter, it just has to be the same for the 
same TaskInstance
-    assert call_result1 == call_result2
-    assert call_result1 == "016f5e9e-c4c8-7c30-9eda-d9c646d633ea"
-
-
-def test_lineage_root_run_id_with_conf_af2():
-    conf = {
-        "openlineage": {
-            "rootParentRunId": "22222222-2222-2222-2222-222222222222",
-            "rootParentJobNamespace": "rootns",
-            "rootParentJobName": "rootjob",
-        }
-    }
-    task_instance = mock.MagicMock(
-        dag_run=mock.MagicMock(conf=conf),
-    )
-
-    call_result1 = lineage_root_run_id(task_instance)
-    call_result2 = lineage_root_run_id(task_instance)
-
-    assert call_result1 == call_result2
-    assert call_result1 == "22222222-2222-2222-2222-222222222222"
-
-
+    root_run_id = lineage_root_run_id(runtime_ti)
+    root_job_name = lineage_root_job_name(runtime_ti)
+    root_job_namespace = lineage_root_job_namespace(runtime_ti)
+    assert root_run_id == "33333333-3333-3333-3333-333333333333"
+    assert root_job_name == "parentjob"
+    assert root_job_namespace == "parentns"
+
+
[email protected](
+    "conf",
+    (
+        {},
+        None,
+        {"some": "other"},
+        {"openlineage": {}},
+        {"openlineage": "some"},
+        {"openlineage": {"rootParentRunId": 
"22222222-2222-2222-2222-222222222222"}},
+        {
+            "openlineage": {
+                "rootParentRunId": "22222222-2222-2222-2222-222222222222",
+                "rootParentJobName": "rootjob",
+            }
+        },
+        {
+            "openlineage": {
+                "parentRunId": "33333333-3333-3333-3333-333333333333",
+            }
+        },
+        {
+            "openlineage": {
+                "parentRunId": "33333333-3333-3333-3333-333333333333",
+                "parentJobName": "parentjob",
+            }
+        },
+        {
+            "openlineage": {
+                "rootParentRunId": "22222222-2222-2222-2222-222222222222",
+                "parentRunId": "33333333-3333-3333-3333-333333333333",
+            }
+        },
+    ),
+)
 @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Test only for Airflow 
3.0+")
-def test_lineage_root_job_name_no_conf_af3(create_runtime_ti):
+def 
test_lineage_root_macros_use_dagrun_info_when_missing_or_invalid_conf_af3(create_runtime_ti,
 conf):
     from airflow.providers.common.compat.sdk import BaseOperator
 
     task = BaseOperator(task_id="test_task")
@@ -224,111 +247,122 @@ def 
test_lineage_root_job_name_no_conf_af3(create_runtime_ti):
         task=task,
         dag_id="test_dag",
         run_id="test_run_id",
-        conf=None,
+        conf=conf,
     )
 
-    result = lineage_root_job_name(runtime_ti)
-    assert result == "test_dag"
+    root_run_id = lineage_root_run_id(runtime_ti)
+    root_job_name = lineage_root_job_name(runtime_ti)
+    root_job_namespace = lineage_root_job_namespace(runtime_ti)
+    assert root_run_id == "01937fbb-4680-70b3-b49b-1de6b041527a"
+    assert root_job_name == "test_dag"
+    assert root_job_namespace == _DAG_NAMESPACE
 
 
[email protected](not AIRFLOW_V_3_0_PLUS, reason="Test only for Airflow 
3.0+")
-def test_lineage_root_job_name_with_conf_af3(create_runtime_ti):
-    from airflow.providers.common.compat.sdk import BaseOperator
-
-    task = BaseOperator(task_id="test_task")
-
-    runtime_ti = create_runtime_ti(
-        task=task,
-        dag_id="test_dag",
-        run_id="test_run_id",
-        conf={
[email protected](
+    "conf",
+    (
+        {
             "openlineage": {
                 "rootParentRunId": "22222222-2222-2222-2222-222222222222",
                 "rootParentJobNamespace": "rootns",
                 "rootParentJobName": "rootjob",
             }
         },
-    )
-
-    result = lineage_root_job_name(runtime_ti)
-    assert result == "rootjob"
-
-
-def test_lineage_root_job_name_without_conf_af2():
-    task_instance = mock.MagicMock(
-        dag_id="dag_id",
-        dag_run=mock.MagicMock(conf={}),
-    )
+        {
+            "openlineage": {
+                "rootParentRunId": "22222222-2222-2222-2222-222222222222",
+                "rootParentJobNamespace": "rootns",
+                "rootParentJobName": "rootjob",
+                "parentRunId": "33333333-3333-3333-3333-333333333333",
+                "parentJobNamespace": "parentns",
+                "parentJobName": "parentjob",
+            }
+        },
+    ),
+)
[email protected](AIRFLOW_V_3_0_PLUS, reason="Test only for Airflow 2")
+def test_lineage_root_macros_use_root_from_conf_af2(conf):
+    task_instance = mock.MagicMock(dag_run=mock.MagicMock(conf=conf))
 
-    result = lineage_root_job_name(task_instance)
-    assert result == "dag_id"
+    root_run_id = lineage_root_run_id(task_instance)
+    root_job_name = lineage_root_job_name(task_instance)
+    root_job_namespace = lineage_root_job_namespace(task_instance)
+    assert root_run_id == "22222222-2222-2222-2222-222222222222"
+    assert root_job_name == "rootjob"
+    assert root_job_namespace == "rootns"
 
 
-def test_lineage_root_job_name_with_conf_af2():
[email protected](AIRFLOW_V_3_0_PLUS, reason="Test only for Airflow 2")
+def test_lineage_root_macros_use_parent_from_conf_when_root_missing_af2():
     conf = {
         "openlineage": {
-            "rootParentRunId": "22222222-2222-2222-2222-222222222222",
-            "rootParentJobNamespace": "rootns",
-            "rootParentJobName": "rootjob",
+            "parentRunId": "33333333-3333-3333-3333-333333333333",
+            "parentJobNamespace": "parentns",
+            "parentJobName": "parentjob",
         }
     }
     task_instance = mock.MagicMock(dag_run=mock.MagicMock(conf=conf))
 
-    result = lineage_root_job_name(task_instance)
-    assert result == "rootjob"
-
-
[email protected](not AIRFLOW_V_3_0_PLUS, reason="Test only for Airflow 
3.0+")
-def test_lineage_root_job_namespace_no_conf_af3(create_runtime_ti):
-    from airflow.providers.common.compat.sdk import BaseOperator
-
-    task = BaseOperator(task_id="test_task")
-
-    runtime_ti = create_runtime_ti(task=task, dag_id="test_dag", 
run_id="test_run_id", conf=None)
-
-    result = lineage_root_job_namespace(runtime_ti)
-    assert result == _DAG_NAMESPACE
-
-
[email protected](not AIRFLOW_V_3_0_PLUS, reason="Test only for Airflow 
3.0+")
-def test_lineage_root_job_namespace_with_conf_af3(create_runtime_ti):
-    from airflow.providers.common.compat.sdk import BaseOperator
-
-    task = BaseOperator(task_id="test_task")
-
-    runtime_ti = create_runtime_ti(
-        task=task,
-        dag_id="test_dag",
-        run_id="test_run_id",
-        conf={
+    root_run_id = lineage_root_run_id(task_instance)
+    root_job_name = lineage_root_job_name(task_instance)
+    root_job_namespace = lineage_root_job_namespace(task_instance)
+    assert root_run_id == "33333333-3333-3333-3333-333333333333"
+    assert root_job_name == "parentjob"
+    assert root_job_namespace == "parentns"
+
+
[email protected](
+    "conf",
+    (
+        {},
+        None,
+        {"some": "other"},
+        {"openlineage": {}},
+        {"openlineage": "some"},
+        {"openlineage": {"rootParentRunId": 
"22222222-2222-2222-2222-222222222222"}},
+        {
             "openlineage": {
                 "rootParentRunId": "22222222-2222-2222-2222-222222222222",
-                "rootParentJobNamespace": "rootns",
                 "rootParentJobName": "rootjob",
             }
         },
+        {
+            "openlineage": {
+                "parentRunId": "33333333-3333-3333-3333-333333333333",
+            }
+        },
+        {
+            "openlineage": {
+                "parentRunId": "33333333-3333-3333-3333-333333333333",
+                "parentJobName": "parentjob",
+            }
+        },
+        {
+            "openlineage": {
+                "rootParentRunId": "22222222-2222-2222-2222-222222222222",
+                "parentRunId": "33333333-3333-3333-3333-333333333333",
+            }
+        },
+    ),
+)
[email protected](AIRFLOW_V_3_0_PLUS, reason="Test only for Airflow 2")
+def 
test_lineage_root_macros_use_dagrun_info_when_missing_or_invalid_conf_af2(conf):
+    date = datetime(2020, 1, 1, 1, 1, 1, 0, tzinfo=timezone.utc)
+    conf = {}
+    dag_run = mock.MagicMock(run_id="run_id", conf=conf)
+    dag_run.logical_date = date
+    dag_run.clear_number = 1
+    task_instance = mock.MagicMock(
+        dag_id="dag_id",
+        task_id="task_id",
+        dag_run=dag_run,
+        logical_date=date,
+        try_number=1,
     )
 
-    result = lineage_root_job_namespace(runtime_ti)
-    assert result == "rootns"
-
-
-def test_lineage_root_job_namespace_without_conf_af2():
-    task_instance = mock.MagicMock(dag_run=mock.MagicMock(conf={}))
-
-    result = lineage_root_job_namespace(task_instance)
-    assert result == _DAG_NAMESPACE
-
-
-def test_lineage_root_job_namespace_with_conf_af2():
-    conf = {
-        "openlineage": {
-            "rootParentRunId": "22222222-2222-2222-2222-222222222222",
-            "rootParentJobNamespace": "rootns",
-            "rootParentJobName": "rootjob",
-        }
-    }
-    task_instance = mock.MagicMock(dag_run=mock.MagicMock(conf=conf))
-
-    result = lineage_root_job_namespace(task_instance)
-    assert result == "rootns"
+    root_run_id = lineage_root_run_id(task_instance)
+    root_job_name = lineage_root_job_name(task_instance)
+    root_job_namespace = lineage_root_job_namespace(task_instance)
+    assert root_run_id == "016f5e9e-c4c8-7c30-9eda-d9c646d633ea"
+    assert root_job_name == "dag_id"
+    assert root_job_namespace == _DAG_NAMESPACE

Reply via email to