This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 90eae569db4 OpenLineage: Include `AirflowDagRunFacet` in 
complete/failed events (#45615)
90eae569db4 is described below

commit 90eae569db448bf24afd4b14505f055100c1193e
Author: Maciej Obuchowski <[email protected]>
AuthorDate: Mon Jan 13 16:07:38 2025 +0100

    OpenLineage: Include `AirflowDagRunFacet` in complete/failed events (#45615)
    
    Signed-off-by: Maciej Obuchowski <[email protected]>
---
 providers/src/airflow/providers/openlineage/plugins/adapter.py  | 4 ++++
 providers/src/airflow/providers/openlineage/plugins/listener.py | 3 +++
 providers/tests/openlineage/plugins/test_adapter.py             | 4 ++++
 3 files changed, 11 insertions(+)

diff --git a/providers/src/airflow/providers/openlineage/plugins/adapter.py 
b/providers/src/airflow/providers/openlineage/plugins/adapter.py
index e6410991192..450cf38f002 100644
--- a/providers/src/airflow/providers/openlineage/plugins/adapter.py
+++ b/providers/src/airflow/providers/openlineage/plugins/adapter.py
@@ -377,6 +377,7 @@ class OpenLineageAdapter(LoggingMixin):
         clear_number: int,
         dag_run_state: DagRunState,
         task_ids: list[str],
+        run_facets: dict[str, RunFacet],
     ):
         try:
             event = RunEvent(
@@ -390,6 +391,7 @@ class OpenLineageAdapter(LoggingMixin):
                     facets={
                         **get_airflow_state_run_facet(dag_id, run_id, 
task_ids, dag_run_state),
                         **get_airflow_debug_facet(),
+                        **run_facets,
                     },
                 ),
                 inputs=[],
@@ -413,6 +415,7 @@ class OpenLineageAdapter(LoggingMixin):
         dag_run_state: DagRunState,
         task_ids: list[str],
         msg: str,
+        run_facets: dict[str, RunFacet],
     ):
         try:
             event = RunEvent(
@@ -431,6 +434,7 @@ class OpenLineageAdapter(LoggingMixin):
                         ),
                         **get_airflow_state_run_facet(dag_id, run_id, 
task_ids, dag_run_state),
                         **get_airflow_debug_facet(),
+                        **run_facets,
                     },
                 ),
                 inputs=[],
diff --git a/providers/src/airflow/providers/openlineage/plugins/listener.py 
b/providers/src/airflow/providers/openlineage/plugins/listener.py
index aefd534f155..c1da206c987 100644
--- a/providers/src/airflow/providers/openlineage/plugins/listener.py
+++ b/providers/src/airflow/providers/openlineage/plugins/listener.py
@@ -500,6 +500,7 @@ class OpenLineageListener:
                 task_ids = DagRun._get_partial_task_ids(dag_run.dag)
             else:
                 task_ids = dag_run.dag.task_ids if dag_run.dag and 
dag_run.dag.partial else None
+
             self.submit_callable(
                 self.adapter.dag_success,
                 dag_id=dag_run.dag_id,
@@ -509,6 +510,7 @@ class OpenLineageListener:
                 clear_number=dag_run.clear_number,
                 task_ids=task_ids,
                 dag_run_state=dag_run.get_state(),
+                run_facets={**get_airflow_dag_run_facet(dag_run)},
             )
         except BaseException as e:
             self.log.warning("OpenLineage received exception in method 
on_dag_run_success", exc_info=e)
@@ -543,6 +545,7 @@ class OpenLineageListener:
                 dag_run_state=dag_run.get_state(),
                 task_ids=task_ids,
                 msg=msg,
+                run_facets={**get_airflow_dag_run_facet(dag_run)},
             )
         except BaseException as e:
             self.log.warning("OpenLineage received exception in method 
on_dag_run_failed", exc_info=e)
diff --git a/providers/tests/openlineage/plugins/test_adapter.py 
b/providers/tests/openlineage/plugins/test_adapter.py
index a7f80085323..fd7f01ff61e 100644
--- a/providers/tests/openlineage/plugins/test_adapter.py
+++ b/providers/tests/openlineage/plugins/test_adapter.py
@@ -713,6 +713,7 @@ def test_emit_dag_complete_event(
         clear_number=0,
         dag_run_state=DagRunState.SUCCESS,
         task_ids=["task_0", "task_1", "task_2.test"],
+        run_facets={"airflowDagRun": AirflowDagRunFacet(dag={"description": 
"dag desc"}, dagRun=dag_run)},
     )
 
     client.emit.assert_called_once_with(
@@ -731,6 +732,7 @@ def test_emit_dag_complete_event(
                         },
                     ),
                     "debug": AirflowDebugRunFacet(packages=ANY),
+                    "airflowDagRun": AirflowDagRunFacet(dag={"description": 
"dag desc"}, dagRun=dag_run),
                 },
             ),
             job=Job(
@@ -804,6 +806,7 @@ def test_emit_dag_failed_event(
         dag_run_state=DagRunState.FAILED,
         task_ids=["task_0", "task_1", "task_2.test"],
         msg="error msg",
+        run_facets={"airflowDagRun": AirflowDagRunFacet(dag={"description": 
"dag desc"}, dagRun=dag_run)},
     )
 
     client.emit.assert_called_once_with(
@@ -825,6 +828,7 @@ def test_emit_dag_failed_event(
                         },
                     ),
                     "debug": AirflowDebugRunFacet(packages=ANY),
+                    "airflowDagRun": AirflowDagRunFacet(dag={"description": 
"dag desc"}, dagRun=dag_run),
                 },
             ),
             job=Job(

Reply via email to