This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 810ad03918f fix: ol root macros should reflect root from dagrun conf
parent (#58428)
810ad03918f is described below
commit 810ad03918fcf5fc6bc8bdd307666f12adc00e2e
Author: Kacper Muda <[email protected]>
AuthorDate: Tue Nov 18 15:23:20 2025 +0100
fix: ol root macros should reflect root from dagrun conf parent (#58428)
---
.../providers/openlineage/plugins/macros.py | 12 +-
.../tests/unit/openlineage/plugins/test_macros.py | 306 ++++++++++++---------
2 files changed, 181 insertions(+), 137 deletions(-)
diff --git
a/providers/openlineage/src/airflow/providers/openlineage/plugins/macros.py
b/providers/openlineage/src/airflow/providers/openlineage/plugins/macros.py
index 508c89eafda..58d1a5b452a 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/plugins/macros.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/plugins/macros.py
@@ -20,7 +20,11 @@ from typing import TYPE_CHECKING
from airflow.providers.openlineage import conf
from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter
-from airflow.providers.openlineage.utils.utils import get_job_name,
get_root_information_from_dagrun_conf
+from airflow.providers.openlineage.utils.utils import (
+ get_job_name,
+ get_parent_information_from_dagrun_conf,
+ get_root_information_from_dagrun_conf,
+)
from airflow.providers.openlineage.version_compat import AIRFLOW_V_3_0_PLUS
if TYPE_CHECKING:
@@ -136,7 +140,13 @@ def lineage_root_job_namespace(task_instance:
TaskInstance):
def _get_ol_root_id(id_key: str, task_instance: TaskInstance) -> str | None:
dr_conf = _get_dag_run_conf(task_instance=task_instance)
+ # Check DagRun conf for root info
ol_root_info = get_root_information_from_dagrun_conf(dr_conf=dr_conf)
+ if ol_root_info and ol_root_info.get(id_key):
+ return ol_root_info[id_key]
+ # Then check DagRun conf for parent into that is used as root in case
explicit root is missing
+ id_key = id_key.replace("root_", "")
+ ol_root_info = get_parent_information_from_dagrun_conf(dr_conf=dr_conf)
if ol_root_info and ol_root_info.get(id_key):
return ol_root_info[id_key]
return None
diff --git
a/providers/openlineage/tests/unit/openlineage/plugins/test_macros.py
b/providers/openlineage/tests/unit/openlineage/plugins/test_macros.py
index 3cdde5d183f..5a0dcb6e348 100644
--- a/providers/openlineage/tests/unit/openlineage/plugins/test_macros.py
+++ b/providers/openlineage/tests/unit/openlineage/plugins/test_macros.py
@@ -133,8 +133,30 @@ def
test_lineage_root_run_id_with_runtime_task_instance(create_runtime_ti):
pytest.fail(f"lineage_root_run_id should not throw AttributeError with
RuntimeTaskInstance: {e}")
[email protected](
+ "conf",
+ (
+ {
+ "openlineage": {
+ "rootParentRunId": "22222222-2222-2222-2222-222222222222",
+ "rootParentJobNamespace": "rootns",
+ "rootParentJobName": "rootjob",
+ }
+ },
+ {
+ "openlineage": {
+ "rootParentRunId": "22222222-2222-2222-2222-222222222222",
+ "rootParentJobNamespace": "rootns",
+ "rootParentJobName": "rootjob",
+ "parentRunId": "33333333-3333-3333-3333-333333333333",
+ "parentJobNamespace": "parentns",
+ "parentJobName": "parentjob",
+ }
+ },
+ ),
+)
@pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Test only for Airflow
3.0+")
-def test_lineage_root_run_id_no_conf_af3(create_runtime_ti):
+def test_lineage_root_macros_use_root_from_conf_af3(create_runtime_ti, conf):
from airflow.providers.common.compat.sdk import BaseOperator
task = BaseOperator(task_id="test_task")
@@ -143,15 +165,19 @@ def
test_lineage_root_run_id_no_conf_af3(create_runtime_ti):
task=task,
dag_id="test_dag",
run_id="test_run_id",
- conf=None,
+ conf=conf,
)
- result = lineage_root_run_id(runtime_ti)
- assert result == "01937fbb-4680-70b3-b49b-1de6b041527a"
+ root_run_id = lineage_root_run_id(runtime_ti)
+ root_job_name = lineage_root_job_name(runtime_ti)
+ root_job_namespace = lineage_root_job_namespace(runtime_ti)
+ assert root_run_id == "22222222-2222-2222-2222-222222222222"
+ assert root_job_name == "rootjob"
+ assert root_job_namespace == "rootns"
@pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Test only for Airflow
3.0+")
-def test_lineage_root_run_id_with_conf_af3(create_runtime_ti):
+def
test_lineage_root_macros_use_parent_from_conf_when_root_missing_af3(create_runtime_ti):
from airflow.providers.common.compat.sdk import BaseOperator
task = BaseOperator(task_id="test_task")
@@ -162,60 +188,57 @@ def
test_lineage_root_run_id_with_conf_af3(create_runtime_ti):
run_id="test_run_id",
conf={
"openlineage": {
- "rootParentRunId": "22222222-2222-2222-2222-222222222222",
- "rootParentJobNamespace": "rootns",
- "rootParentJobName": "rootjob",
+ "parentRunId": "33333333-3333-3333-3333-333333333333",
+ "parentJobNamespace": "parentns",
+ "parentJobName": "parentjob",
}
},
)
- result = lineage_root_run_id(runtime_ti)
- assert result == "22222222-2222-2222-2222-222222222222"
-
-
-def test_lineage_root_run_id_without_conf_af2():
- date = datetime(2020, 1, 1, 1, 1, 1, 0, tzinfo=timezone.utc)
- conf = {}
- dag_run = mock.MagicMock(run_id="run_id", conf=conf)
- dag_run.logical_date = date
- dag_run.clear_number = 1
- task_instance = mock.MagicMock(
- dag_id="dag_id",
- task_id="task_id",
- dag_run=dag_run,
- logical_date=date,
- try_number=1,
- )
-
- call_result1 = lineage_root_run_id(task_instance)
- call_result2 = lineage_root_run_id(task_instance)
-
- # random part value does not matter, it just has to be the same for the
same TaskInstance
- assert call_result1 == call_result2
- assert call_result1 == "016f5e9e-c4c8-7c30-9eda-d9c646d633ea"
-
-
-def test_lineage_root_run_id_with_conf_af2():
- conf = {
- "openlineage": {
- "rootParentRunId": "22222222-2222-2222-2222-222222222222",
- "rootParentJobNamespace": "rootns",
- "rootParentJobName": "rootjob",
- }
- }
- task_instance = mock.MagicMock(
- dag_run=mock.MagicMock(conf=conf),
- )
-
- call_result1 = lineage_root_run_id(task_instance)
- call_result2 = lineage_root_run_id(task_instance)
-
- assert call_result1 == call_result2
- assert call_result1 == "22222222-2222-2222-2222-222222222222"
-
-
+ root_run_id = lineage_root_run_id(runtime_ti)
+ root_job_name = lineage_root_job_name(runtime_ti)
+ root_job_namespace = lineage_root_job_namespace(runtime_ti)
+ assert root_run_id == "33333333-3333-3333-3333-333333333333"
+ assert root_job_name == "parentjob"
+ assert root_job_namespace == "parentns"
+
+
[email protected](
+ "conf",
+ (
+ {},
+ None,
+ {"some": "other"},
+ {"openlineage": {}},
+ {"openlineage": "some"},
+ {"openlineage": {"rootParentRunId":
"22222222-2222-2222-2222-222222222222"}},
+ {
+ "openlineage": {
+ "rootParentRunId": "22222222-2222-2222-2222-222222222222",
+ "rootParentJobName": "rootjob",
+ }
+ },
+ {
+ "openlineage": {
+ "parentRunId": "33333333-3333-3333-3333-333333333333",
+ }
+ },
+ {
+ "openlineage": {
+ "parentRunId": "33333333-3333-3333-3333-333333333333",
+ "parentJobName": "parentjob",
+ }
+ },
+ {
+ "openlineage": {
+ "rootParentRunId": "22222222-2222-2222-2222-222222222222",
+ "parentRunId": "33333333-3333-3333-3333-333333333333",
+ }
+ },
+ ),
+)
@pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Test only for Airflow
3.0+")
-def test_lineage_root_job_name_no_conf_af3(create_runtime_ti):
+def
test_lineage_root_macros_use_dagrun_info_when_missing_or_invalid_conf_af3(create_runtime_ti,
conf):
from airflow.providers.common.compat.sdk import BaseOperator
task = BaseOperator(task_id="test_task")
@@ -224,111 +247,122 @@ def
test_lineage_root_job_name_no_conf_af3(create_runtime_ti):
task=task,
dag_id="test_dag",
run_id="test_run_id",
- conf=None,
+ conf=conf,
)
- result = lineage_root_job_name(runtime_ti)
- assert result == "test_dag"
+ root_run_id = lineage_root_run_id(runtime_ti)
+ root_job_name = lineage_root_job_name(runtime_ti)
+ root_job_namespace = lineage_root_job_namespace(runtime_ti)
+ assert root_run_id == "01937fbb-4680-70b3-b49b-1de6b041527a"
+ assert root_job_name == "test_dag"
+ assert root_job_namespace == _DAG_NAMESPACE
[email protected](not AIRFLOW_V_3_0_PLUS, reason="Test only for Airflow
3.0+")
-def test_lineage_root_job_name_with_conf_af3(create_runtime_ti):
- from airflow.providers.common.compat.sdk import BaseOperator
-
- task = BaseOperator(task_id="test_task")
-
- runtime_ti = create_runtime_ti(
- task=task,
- dag_id="test_dag",
- run_id="test_run_id",
- conf={
[email protected](
+ "conf",
+ (
+ {
"openlineage": {
"rootParentRunId": "22222222-2222-2222-2222-222222222222",
"rootParentJobNamespace": "rootns",
"rootParentJobName": "rootjob",
}
},
- )
-
- result = lineage_root_job_name(runtime_ti)
- assert result == "rootjob"
-
-
-def test_lineage_root_job_name_without_conf_af2():
- task_instance = mock.MagicMock(
- dag_id="dag_id",
- dag_run=mock.MagicMock(conf={}),
- )
+ {
+ "openlineage": {
+ "rootParentRunId": "22222222-2222-2222-2222-222222222222",
+ "rootParentJobNamespace": "rootns",
+ "rootParentJobName": "rootjob",
+ "parentRunId": "33333333-3333-3333-3333-333333333333",
+ "parentJobNamespace": "parentns",
+ "parentJobName": "parentjob",
+ }
+ },
+ ),
+)
[email protected](AIRFLOW_V_3_0_PLUS, reason="Test only for Airflow 2")
+def test_lineage_root_macros_use_root_from_conf_af2(conf):
+ task_instance = mock.MagicMock(dag_run=mock.MagicMock(conf=conf))
- result = lineage_root_job_name(task_instance)
- assert result == "dag_id"
+ root_run_id = lineage_root_run_id(task_instance)
+ root_job_name = lineage_root_job_name(task_instance)
+ root_job_namespace = lineage_root_job_namespace(task_instance)
+ assert root_run_id == "22222222-2222-2222-2222-222222222222"
+ assert root_job_name == "rootjob"
+ assert root_job_namespace == "rootns"
-def test_lineage_root_job_name_with_conf_af2():
[email protected](AIRFLOW_V_3_0_PLUS, reason="Test only for Airflow 2")
+def test_lineage_root_macros_use_parent_from_conf_when_root_missing_af2():
conf = {
"openlineage": {
- "rootParentRunId": "22222222-2222-2222-2222-222222222222",
- "rootParentJobNamespace": "rootns",
- "rootParentJobName": "rootjob",
+ "parentRunId": "33333333-3333-3333-3333-333333333333",
+ "parentJobNamespace": "parentns",
+ "parentJobName": "parentjob",
}
}
task_instance = mock.MagicMock(dag_run=mock.MagicMock(conf=conf))
- result = lineage_root_job_name(task_instance)
- assert result == "rootjob"
-
-
[email protected](not AIRFLOW_V_3_0_PLUS, reason="Test only for Airflow
3.0+")
-def test_lineage_root_job_namespace_no_conf_af3(create_runtime_ti):
- from airflow.providers.common.compat.sdk import BaseOperator
-
- task = BaseOperator(task_id="test_task")
-
- runtime_ti = create_runtime_ti(task=task, dag_id="test_dag",
run_id="test_run_id", conf=None)
-
- result = lineage_root_job_namespace(runtime_ti)
- assert result == _DAG_NAMESPACE
-
-
[email protected](not AIRFLOW_V_3_0_PLUS, reason="Test only for Airflow
3.0+")
-def test_lineage_root_job_namespace_with_conf_af3(create_runtime_ti):
- from airflow.providers.common.compat.sdk import BaseOperator
-
- task = BaseOperator(task_id="test_task")
-
- runtime_ti = create_runtime_ti(
- task=task,
- dag_id="test_dag",
- run_id="test_run_id",
- conf={
+ root_run_id = lineage_root_run_id(task_instance)
+ root_job_name = lineage_root_job_name(task_instance)
+ root_job_namespace = lineage_root_job_namespace(task_instance)
+ assert root_run_id == "33333333-3333-3333-3333-333333333333"
+ assert root_job_name == "parentjob"
+ assert root_job_namespace == "parentns"
+
+
[email protected](
+ "conf",
+ (
+ {},
+ None,
+ {"some": "other"},
+ {"openlineage": {}},
+ {"openlineage": "some"},
+ {"openlineage": {"rootParentRunId":
"22222222-2222-2222-2222-222222222222"}},
+ {
"openlineage": {
"rootParentRunId": "22222222-2222-2222-2222-222222222222",
- "rootParentJobNamespace": "rootns",
"rootParentJobName": "rootjob",
}
},
+ {
+ "openlineage": {
+ "parentRunId": "33333333-3333-3333-3333-333333333333",
+ }
+ },
+ {
+ "openlineage": {
+ "parentRunId": "33333333-3333-3333-3333-333333333333",
+ "parentJobName": "parentjob",
+ }
+ },
+ {
+ "openlineage": {
+ "rootParentRunId": "22222222-2222-2222-2222-222222222222",
+ "parentRunId": "33333333-3333-3333-3333-333333333333",
+ }
+ },
+ ),
+)
[email protected](AIRFLOW_V_3_0_PLUS, reason="Test only for Airflow 2")
+def
test_lineage_root_macros_use_dagrun_info_when_missing_or_invalid_conf_af2(conf):
+ date = datetime(2020, 1, 1, 1, 1, 1, 0, tzinfo=timezone.utc)
+ conf = {}
+ dag_run = mock.MagicMock(run_id="run_id", conf=conf)
+ dag_run.logical_date = date
+ dag_run.clear_number = 1
+ task_instance = mock.MagicMock(
+ dag_id="dag_id",
+ task_id="task_id",
+ dag_run=dag_run,
+ logical_date=date,
+ try_number=1,
)
- result = lineage_root_job_namespace(runtime_ti)
- assert result == "rootns"
-
-
-def test_lineage_root_job_namespace_without_conf_af2():
- task_instance = mock.MagicMock(dag_run=mock.MagicMock(conf={}))
-
- result = lineage_root_job_namespace(task_instance)
- assert result == _DAG_NAMESPACE
-
-
-def test_lineage_root_job_namespace_with_conf_af2():
- conf = {
- "openlineage": {
- "rootParentRunId": "22222222-2222-2222-2222-222222222222",
- "rootParentJobNamespace": "rootns",
- "rootParentJobName": "rootjob",
- }
- }
- task_instance = mock.MagicMock(dag_run=mock.MagicMock(conf=conf))
-
- result = lineage_root_job_namespace(task_instance)
- assert result == "rootns"
+ root_run_id = lineage_root_run_id(task_instance)
+ root_job_name = lineage_root_job_name(task_instance)
+ root_job_namespace = lineage_root_job_namespace(task_instance)
+ assert root_run_id == "016f5e9e-c4c8-7c30-9eda-d9c646d633ea"
+ assert root_job_name == "dag_id"
+ assert root_job_namespace == _DAG_NAMESPACE