This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9fad1316b4f tests: Add more information to check in OL system test 
(#46379)
9fad1316b4f is described below

commit 9fad1316b4f8341e9e2a2a42065ee01e5aa501a6
Author: Kacper Muda <[email protected]>
AuthorDate: Wed Feb 5 17:35:27 2025 +0100

    tests: Add more information to check in OL system test (#46379)
    
    Signed-off-by: Kacper Muda <[email protected]>
---
 .../system/openlineage/example_openlineage.json    | 145 ++++++++++++++++++++-
 .../system/openlineage/example_openlineage.py      |   4 +-
 .../example_openlineage_mapped_sensor.py           |   3 +-
 3 files changed, 145 insertions(+), 7 deletions(-)

diff --git 
a/providers/openlineage/tests/system/openlineage/example_openlineage.json 
b/providers/openlineage/tests/system/openlineage/example_openlineage.json
index 0db8bc53e22..3332d6650d4 100644
--- a/providers/openlineage/tests/system/openlineage/example_openlineage.json
+++ b/providers/openlineage/tests/system/openlineage/example_openlineage.json
@@ -2,17 +2,93 @@
     {
         "eventType": "START",
         "eventTime": "{{ is_datetime(result) }}",
+        "producer": "{{ 
result.startswith('https://github.com/apache/airflow/tree/providers-openlineage')
 }}",
+        "schemaURL": "{{ result.startswith('https://openlineage.io/spec') }}",
+        "inputs": [],
+        "outputs": [],
         "run": {
-            "runId": "{{ is_uuid(result) }}"
+            "runId": "{{ is_uuid(result) }}",
+            "facets": {
+                "parent": {
+                    "job": {
+                        "namespace": "{{ result is string }}",
+                        "name": "openlineage_basic_dag"
+                    },
+                    "run": {
+                        "runId": "{{ is_uuid(result) }}"
+                    }
+                },
+                "airflow": {
+                    "dag": {
+                        "dag_id": "openlineage_basic_dag",
+                        "fileloc": "{{ 
result.endswith('openlineage/example_openlineage.py') }}",
+                        "owner": "airflow",
+                        "start_date": "{{ is_datetime(result) }}"
+                    },
+                    "dagRun": {
+                        "conf": {},
+                        "dag_id": "openlineage_basic_dag",
+                        "data_interval_end": "{{ is_datetime(result) }}",
+                        "data_interval_start": "{{ is_datetime(result) }}",
+                        "start_date": "{{ is_datetime(result) }}"
+                    },
+                    "taskInstance": {
+                        "try_number": "{{ result is number }}",
+                        "queued_dttm": "{{ is_datetime(result) }}",
+                        "log_url": "{{ result is string }}"
+                    },
+                    "task": {
+                        "inlets": "[]",
+                        "mapped": false,
+                        "outlets": "[]",
+                        "task_id": "do_nothing_task",
+                        "trigger_rule": "all_success",
+                        "operator_class": "PythonOperator",
+                        "retries": "{{ result is number }}",
+                        "depends_on_past": false,
+                        "executor_config": {},
+                        "priority_weight": 1,
+                        "multiple_outputs": false,
+                        "upstream_task_ids": "[]",
+                        "downstream_task_ids": "['check_events']",
+                        "operator_class_path": "{{ 
result.endswith('.PythonOperator') }}",
+                        "wait_for_downstream": false,
+                        "retry_exponential_backoff": false,
+                        "ignore_first_depends_on_past": false,
+                        "wait_for_past_depends_before_skipping": false
+                    },
+                    "taskUuid": "{{ is_uuid(result) }}"
+                },
+                "nominalTime": {
+                    "nominalEndTime": "{{ is_datetime(result) }}",
+                    "nominalStartTime": "{{ is_datetime(result) }}"
+                },
+                "processing_engine": {
+                    "name": "Airflow",
+                    "openlineageAdapterVersion": "{{ result is string }}",
+                    "version": "{{ result is string }}"
+                }
+            }
         },
         "job": {
-            "namespace": "default",
+            "namespace": "{{ result is string }}",
             "name": "openlineage_basic_dag.do_nothing_task",
             "facets": {
                 "jobType": {
                     "integration": "AIRFLOW",
                     "jobType": "TASK",
                     "processingType": "BATCH"
+                },
+                "ownership": {
+                    "owners": [
+                        {
+                            "name": "{{ result is string }}"
+                        }
+                    ]
+                },
+                "sourceCode": {
+                    "language": "python",
+                    "sourceCode": "def do_nothing():\n    pass\n"
                 }
             }
         }
@@ -21,16 +97,77 @@
         "eventType": "COMPLETE",
         "eventTime": "{{ is_datetime(result) }}",
         "run": {
-            "runId": "{{ is_uuid(result) }}"
+            "runId": "{{ is_uuid(result) }}",
+            "facets": {
+                "parent": {
+                    "job": {
+                        "namespace": "{{ result is string }}",
+                        "name": "openlineage_basic_dag"
+                    },
+                    "run": {
+                        "runId": "{{ is_uuid(result) }}"
+                    }
+                },
+                "airflow": {
+                    "dag": {
+                        "dag_id": "openlineage_basic_dag",
+                        "fileloc": "{{ 
result.endswith('openlineage/example_openlineage.py') }}",
+                        "owner": "airflow",
+                        "start_date": "{{ is_datetime(result) }}"
+                    },
+                    "dagRun": {
+                        "conf": {},
+                        "dag_id": "openlineage_basic_dag",
+                        "data_interval_end": "{{ is_datetime(result) }}",
+                        "data_interval_start": "{{ is_datetime(result) }}",
+                        "start_date": "{{ is_datetime(result) }}"
+                    },
+                    "taskInstance": {
+                        "try_number": "{{ result is number }}",
+                        "queued_dttm": "{{ is_datetime(result) }}",
+                        "log_url": "{{ result is string }}"
+                    },
+                    "task": {
+                        "inlets": "[]",
+                        "mapped": false,
+                        "outlets": "[]",
+                        "task_id": "do_nothing_task",
+                        "trigger_rule": "all_success",
+                        "operator_class": "PythonOperator",
+                        "retries": "{{ result is number }}",
+                        "depends_on_past": false,
+                        "executor_config": {},
+                        "priority_weight": 1,
+                        "multiple_outputs": false,
+                        "upstream_task_ids": "[]",
+                        "downstream_task_ids": "['check_events']",
+                        "operator_class_path": "{{ 
result.endswith('.PythonOperator') }}",
+                        "wait_for_downstream": false,
+                        "retry_exponential_backoff": false,
+                        "ignore_first_depends_on_past": false,
+                        "wait_for_past_depends_before_skipping": false
+                    },
+                    "taskUuid": "{{ is_uuid(result) }}"
+                },
+                "processing_engine": {
+                    "name": "Airflow",
+                    "openlineageAdapterVersion": "{{ result is string }}",
+                    "version": "{{ result is string }}"
+                }
+            }
         },
         "job": {
-            "namespace": "default",
+            "namespace": "{{ result is string }}",
             "name": "openlineage_basic_dag.do_nothing_task",
             "facets": {
                 "jobType": {
                     "integration": "AIRFLOW",
                     "jobType": "TASK",
                     "processingType": "BATCH"
+                },
+                "sourceCode": {
+                    "language": "python",
+                    "sourceCode": "def do_nothing():\n    pass\n"
                 }
             }
         }
diff --git 
a/providers/openlineage/tests/system/openlineage/example_openlineage.py 
b/providers/openlineage/tests/system/openlineage/example_openlineage.py
index 8e632d2c2dc..28b92540ef4 100644
--- a/providers/openlineage/tests/system/openlineage/example_openlineage.py
+++ b/providers/openlineage/tests/system/openlineage/example_openlineage.py
@@ -16,8 +16,8 @@
 # under the License.
 from __future__ import annotations
 
-import os
 from datetime import datetime
+from pathlib import Path
 
 from providers.openlineage.tests.system.openlineage.operator import 
OpenLineageTestOperator
 
@@ -43,7 +43,7 @@ with DAG(
 
     check_events = OpenLineageTestOperator(
         task_id="check_events",
-        
file_path=f"{os.getenv('AIRFLOW_HOME')}/dags/providers/tests/system/openlineage/example_openlineage.json",
+        file_path=str(Path(__file__).parent / "example_openlineage.json"),
     )
 
     nothing_task >> check_events
diff --git 
a/providers/openlineage/tests/system/openlineage/example_openlineage_mapped_sensor.py
 
b/providers/openlineage/tests/system/openlineage/example_openlineage_mapped_sensor.py
index 44fecc85a35..f49b6e591b3 100644
--- 
a/providers/openlineage/tests/system/openlineage/example_openlineage_mapped_sensor.py
+++ 
b/providers/openlineage/tests/system/openlineage/example_openlineage_mapped_sensor.py
@@ -18,6 +18,7 @@ from __future__ import annotations
 
 import os
 from datetime import datetime, timedelta
+from pathlib import Path
 
 from providers.openlineage.tests.system.openlineage.operator import 
OpenLineageTestOperator
 
@@ -68,7 +69,7 @@ with DAG(
 
     check_events = OpenLineageTestOperator(
         task_id="check_events",
-        
file_path=f"{os.getenv('AIRFLOW_HOME')}/dags/providers/tests/system/openlineage/example_openlineage_mapped_sensor.json",
+        file_path=str(Path(__file__).parent / 
"example_openlineage_mapped_sensor.json"),
         allow_duplicate_events=True,
     )
 

Reply via email to