This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 90eae569db4 OpenLineage: Include `AirflowDagRunFacet` in
complete/failed events (#45615)
90eae569db4 is described below
commit 90eae569db448bf24afd4b14505f055100c1193e
Author: Maciej Obuchowski <[email protected]>
AuthorDate: Mon Jan 13 16:07:38 2025 +0100
OpenLineage: Include `AirflowDagRunFacet` in complete/failed events (#45615)
Signed-off-by: Maciej Obuchowski <[email protected]>
---
providers/src/airflow/providers/openlineage/plugins/adapter.py | 4 ++++
providers/src/airflow/providers/openlineage/plugins/listener.py | 3 +++
providers/tests/openlineage/plugins/test_adapter.py | 4 ++++
3 files changed, 11 insertions(+)
diff --git a/providers/src/airflow/providers/openlineage/plugins/adapter.py
b/providers/src/airflow/providers/openlineage/plugins/adapter.py
index e6410991192..450cf38f002 100644
--- a/providers/src/airflow/providers/openlineage/plugins/adapter.py
+++ b/providers/src/airflow/providers/openlineage/plugins/adapter.py
@@ -377,6 +377,7 @@ class OpenLineageAdapter(LoggingMixin):
clear_number: int,
dag_run_state: DagRunState,
task_ids: list[str],
+ run_facets: dict[str, RunFacet],
):
try:
event = RunEvent(
@@ -390,6 +391,7 @@ class OpenLineageAdapter(LoggingMixin):
facets={
**get_airflow_state_run_facet(dag_id, run_id,
task_ids, dag_run_state),
**get_airflow_debug_facet(),
+ **run_facets,
},
),
inputs=[],
@@ -413,6 +415,7 @@ class OpenLineageAdapter(LoggingMixin):
dag_run_state: DagRunState,
task_ids: list[str],
msg: str,
+ run_facets: dict[str, RunFacet],
):
try:
event = RunEvent(
@@ -431,6 +434,7 @@ class OpenLineageAdapter(LoggingMixin):
),
**get_airflow_state_run_facet(dag_id, run_id,
task_ids, dag_run_state),
**get_airflow_debug_facet(),
+ **run_facets,
},
),
inputs=[],
diff --git a/providers/src/airflow/providers/openlineage/plugins/listener.py
b/providers/src/airflow/providers/openlineage/plugins/listener.py
index aefd534f155..c1da206c987 100644
--- a/providers/src/airflow/providers/openlineage/plugins/listener.py
+++ b/providers/src/airflow/providers/openlineage/plugins/listener.py
@@ -500,6 +500,7 @@ class OpenLineageListener:
task_ids = DagRun._get_partial_task_ids(dag_run.dag)
else:
task_ids = dag_run.dag.task_ids if dag_run.dag and
dag_run.dag.partial else None
+
self.submit_callable(
self.adapter.dag_success,
dag_id=dag_run.dag_id,
@@ -509,6 +510,7 @@ class OpenLineageListener:
clear_number=dag_run.clear_number,
task_ids=task_ids,
dag_run_state=dag_run.get_state(),
+ run_facets={**get_airflow_dag_run_facet(dag_run)},
)
except BaseException as e:
self.log.warning("OpenLineage received exception in method
on_dag_run_success", exc_info=e)
@@ -543,6 +545,7 @@ class OpenLineageListener:
dag_run_state=dag_run.get_state(),
task_ids=task_ids,
msg=msg,
+ run_facets={**get_airflow_dag_run_facet(dag_run)},
)
except BaseException as e:
self.log.warning("OpenLineage received exception in method
on_dag_run_failed", exc_info=e)
diff --git a/providers/tests/openlineage/plugins/test_adapter.py
b/providers/tests/openlineage/plugins/test_adapter.py
index a7f80085323..fd7f01ff61e 100644
--- a/providers/tests/openlineage/plugins/test_adapter.py
+++ b/providers/tests/openlineage/plugins/test_adapter.py
@@ -713,6 +713,7 @@ def test_emit_dag_complete_event(
clear_number=0,
dag_run_state=DagRunState.SUCCESS,
task_ids=["task_0", "task_1", "task_2.test"],
+ run_facets={"airflowDagRun": AirflowDagRunFacet(dag={"description":
"dag desc"}, dagRun=dag_run)},
)
client.emit.assert_called_once_with(
@@ -731,6 +732,7 @@ def test_emit_dag_complete_event(
},
),
"debug": AirflowDebugRunFacet(packages=ANY),
+ "airflowDagRun": AirflowDagRunFacet(dag={"description":
"dag desc"}, dagRun=dag_run),
},
),
job=Job(
@@ -804,6 +806,7 @@ def test_emit_dag_failed_event(
dag_run_state=DagRunState.FAILED,
task_ids=["task_0", "task_1", "task_2.test"],
msg="error msg",
+ run_facets={"airflowDagRun": AirflowDagRunFacet(dag={"description":
"dag desc"}, dagRun=dag_run)},
)
client.emit.assert_called_once_with(
@@ -825,6 +828,7 @@ def test_emit_dag_failed_event(
},
),
"debug": AirflowDebugRunFacet(packages=ANY),
+ "airflowDagRun": AirflowDagRunFacet(dag={"description":
"dag desc"}, dagRun=dag_run),
},
),
job=Job(