This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9fad1316b4f tests: Add more information to check in OL system test
(#46379)
9fad1316b4f is described below
commit 9fad1316b4f8341e9e2a2a42065ee01e5aa501a6
Author: Kacper Muda <[email protected]>
AuthorDate: Wed Feb 5 17:35:27 2025 +0100
tests: Add more information to check in OL system test (#46379)
Signed-off-by: Kacper Muda <[email protected]>
---
.../system/openlineage/example_openlineage.json | 145 ++++++++++++++++++++-
.../system/openlineage/example_openlineage.py | 4 +-
.../example_openlineage_mapped_sensor.py | 3 +-
3 files changed, 145 insertions(+), 7 deletions(-)
diff --git
a/providers/openlineage/tests/system/openlineage/example_openlineage.json
b/providers/openlineage/tests/system/openlineage/example_openlineage.json
index 0db8bc53e22..3332d6650d4 100644
--- a/providers/openlineage/tests/system/openlineage/example_openlineage.json
+++ b/providers/openlineage/tests/system/openlineage/example_openlineage.json
@@ -2,17 +2,93 @@
{
"eventType": "START",
"eventTime": "{{ is_datetime(result) }}",
+ "producer": "{{
result.startswith('https://github.com/apache/airflow/tree/providers-openlineage')
}}",
+ "schemaURL": "{{ result.startswith('https://openlineage.io/spec') }}",
+ "inputs": [],
+ "outputs": [],
"run": {
- "runId": "{{ is_uuid(result) }}"
+ "runId": "{{ is_uuid(result) }}",
+ "facets": {
+ "parent": {
+ "job": {
+ "namespace": "{{ result is string }}",
+ "name": "openlineage_basic_dag"
+ },
+ "run": {
+ "runId": "{{ is_uuid(result) }}"
+ }
+ },
+ "airflow": {
+ "dag": {
+ "dag_id": "openlineage_basic_dag",
+ "fileloc": "{{
result.endswith('openlineage/example_openlineage.py') }}",
+ "owner": "airflow",
+ "start_date": "{{ is_datetime(result) }}"
+ },
+ "dagRun": {
+ "conf": {},
+ "dag_id": "openlineage_basic_dag",
+ "data_interval_end": "{{ is_datetime(result) }}",
+ "data_interval_start": "{{ is_datetime(result) }}",
+ "start_date": "{{ is_datetime(result) }}"
+ },
+ "taskInstance": {
+ "try_number": "{{ result is number }}",
+ "queued_dttm": "{{ is_datetime(result) }}",
+ "log_url": "{{ result is string }}"
+ },
+ "task": {
+ "inlets": "[]",
+ "mapped": false,
+ "outlets": "[]",
+ "task_id": "do_nothing_task",
+ "trigger_rule": "all_success",
+ "operator_class": "PythonOperator",
+ "retries": "{{ result is number }}",
+ "depends_on_past": false,
+ "executor_config": {},
+ "priority_weight": 1,
+ "multiple_outputs": false,
+ "upstream_task_ids": "[]",
+ "downstream_task_ids": "['check_events']",
+ "operator_class_path": "{{
result.endswith('.PythonOperator') }}",
+ "wait_for_downstream": false,
+ "retry_exponential_backoff": false,
+ "ignore_first_depends_on_past": false,
+ "wait_for_past_depends_before_skipping": false
+ },
+ "taskUuid": "{{ is_uuid(result) }}"
+ },
+ "nominalTime": {
+ "nominalEndTime": "{{ is_datetime(result) }}",
+ "nominalStartTime": "{{ is_datetime(result) }}"
+ },
+ "processing_engine": {
+ "name": "Airflow",
+ "openlineageAdapterVersion": "{{ result is string }}",
+ "version": "{{ result is string }}"
+ }
+ }
},
"job": {
- "namespace": "default",
+ "namespace": "{{ result is string }}",
"name": "openlineage_basic_dag.do_nothing_task",
"facets": {
"jobType": {
"integration": "AIRFLOW",
"jobType": "TASK",
"processingType": "BATCH"
+ },
+ "ownership": {
+ "owners": [
+ {
+ "name": "{{ result is string }}"
+ }
+ ]
+ },
+ "sourceCode": {
+ "language": "python",
+ "sourceCode": "def do_nothing():\n pass\n"
}
}
}
@@ -21,16 +97,77 @@
"eventType": "COMPLETE",
"eventTime": "{{ is_datetime(result) }}",
"run": {
- "runId": "{{ is_uuid(result) }}"
+ "runId": "{{ is_uuid(result) }}",
+ "facets": {
+ "parent": {
+ "job": {
+ "namespace": "{{ result is string }}",
+ "name": "openlineage_basic_dag"
+ },
+ "run": {
+ "runId": "{{ is_uuid(result) }}"
+ }
+ },
+ "airflow": {
+ "dag": {
+ "dag_id": "openlineage_basic_dag",
+ "fileloc": "{{
result.endswith('openlineage/example_openlineage.py') }}",
+ "owner": "airflow",
+ "start_date": "{{ is_datetime(result) }}"
+ },
+ "dagRun": {
+ "conf": {},
+ "dag_id": "openlineage_basic_dag",
+ "data_interval_end": "{{ is_datetime(result) }}",
+ "data_interval_start": "{{ is_datetime(result) }}",
+ "start_date": "{{ is_datetime(result) }}"
+ },
+ "taskInstance": {
+ "try_number": "{{ result is number }}",
+ "queued_dttm": "{{ is_datetime(result) }}",
+ "log_url": "{{ result is string }}"
+ },
+ "task": {
+ "inlets": "[]",
+ "mapped": false,
+ "outlets": "[]",
+ "task_id": "do_nothing_task",
+ "trigger_rule": "all_success",
+ "operator_class": "PythonOperator",
+ "retries": "{{ result is number }}",
+ "depends_on_past": false,
+ "executor_config": {},
+ "priority_weight": 1,
+ "multiple_outputs": false,
+ "upstream_task_ids": "[]",
+ "downstream_task_ids": "['check_events']",
+ "operator_class_path": "{{
result.endswith('.PythonOperator') }}",
+ "wait_for_downstream": false,
+ "retry_exponential_backoff": false,
+ "ignore_first_depends_on_past": false,
+ "wait_for_past_depends_before_skipping": false
+ },
+ "taskUuid": "{{ is_uuid(result) }}"
+ },
+ "processing_engine": {
+ "name": "Airflow",
+ "openlineageAdapterVersion": "{{ result is string }}",
+ "version": "{{ result is string }}"
+ }
+ }
},
"job": {
- "namespace": "default",
+ "namespace": "{{ result is string }}",
"name": "openlineage_basic_dag.do_nothing_task",
"facets": {
"jobType": {
"integration": "AIRFLOW",
"jobType": "TASK",
"processingType": "BATCH"
+ },
+ "sourceCode": {
+ "language": "python",
+ "sourceCode": "def do_nothing():\n pass\n"
}
}
}
diff --git
a/providers/openlineage/tests/system/openlineage/example_openlineage.py
b/providers/openlineage/tests/system/openlineage/example_openlineage.py
index 8e632d2c2dc..28b92540ef4 100644
--- a/providers/openlineage/tests/system/openlineage/example_openlineage.py
+++ b/providers/openlineage/tests/system/openlineage/example_openlineage.py
@@ -16,8 +16,8 @@
# under the License.
from __future__ import annotations
-import os
from datetime import datetime
+from pathlib import Path
from providers.openlineage.tests.system.openlineage.operator import
OpenLineageTestOperator
@@ -43,7 +43,7 @@ with DAG(
check_events = OpenLineageTestOperator(
task_id="check_events",
-
file_path=f"{os.getenv('AIRFLOW_HOME')}/dags/providers/tests/system/openlineage/example_openlineage.json",
+ file_path=str(Path(__file__).parent / "example_openlineage.json"),
)
nothing_task >> check_events
diff --git
a/providers/openlineage/tests/system/openlineage/example_openlineage_mapped_sensor.py
b/providers/openlineage/tests/system/openlineage/example_openlineage_mapped_sensor.py
index 44fecc85a35..f49b6e591b3 100644
---
a/providers/openlineage/tests/system/openlineage/example_openlineage_mapped_sensor.py
+++
b/providers/openlineage/tests/system/openlineage/example_openlineage_mapped_sensor.py
@@ -18,6 +18,7 @@ from __future__ import annotations
import os
from datetime import datetime, timedelta
+from pathlib import Path
from providers.openlineage.tests.system.openlineage.operator import
OpenLineageTestOperator
@@ -68,7 +69,7 @@ with DAG(
check_events = OpenLineageTestOperator(
task_id="check_events",
-
file_path=f"{os.getenv('AIRFLOW_HOME')}/dags/providers/tests/system/openlineage/example_openlineage_mapped_sensor.json",
+ file_path=str(Path(__file__).parent /
"example_openlineage_mapped_sensor.json"),
allow_duplicate_events=True,
)