This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 71532401a01 Small operator improvements in openlineage system tests
(#68677)
71532401a01 is described below
commit 71532401a01f69a2465c49b670b80381cbd40808
Author: Kacper Muda <[email protected]>
AuthorDate: Thu Jun 18 11:18:41 2026 +0200
Small operator improvements in openlineage system tests (#68677)
---
...openlineage_trigger_failed_dag__af3_3_plus.json | 441 +++++++++++++++++++++
.../tests/system/openlineage/operator.py | 18 +-
2 files changed, 455 insertions(+), 4 deletions(-)
diff --git
a/providers/openlineage/tests/system/openlineage/expected_events/openlineage_trigger_failed_dag__af3_3_plus.json
b/providers/openlineage/tests/system/openlineage/expected_events/openlineage_trigger_failed_dag__af3_3_plus.json
new file mode 100644
index 00000000000..a32fcb3ba32
--- /dev/null
+++
b/providers/openlineage/tests/system/openlineage/expected_events/openlineage_trigger_failed_dag__af3_3_plus.json
@@ -0,0 +1,441 @@
+[
+ {
+ "eventType": "START",
+ "eventTime": "{{ is_datetime(result) }}",
+ "producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/RunEvent$\")
}}",
+ "inputs": [],
+ "outputs": [],
+ "run": {
+ "runId": "{{ is_uuid(result) }}",
+ "facets": {
+ "airflowDagRun": {
+ "dag": {
+ "dag_id":
"openlineage_trigger_failed_dag_child__notrigger",
+ "fileloc": "{{
result.endswith('openlineage/example_openlineage_trigger_failed_dag.py') }}",
+ "owner": "airflow",
+ "owner_links": {},
+ "start_date": "{{ is_datetime(result) }}",
+ "tags": "{{ result[1:-1].split(', ') | sort ==
['\"with\\'quote\"', \"'first'\", \"'second@'\", '\\'z\"e\\''] }}",
+ "timetable": {}
+ },
+ "dagRun": {
+ "conf": {
+ "some_config": "value1",
+ "openlineage": {
+ "parentRunId":
"3bb703d1-09c1-4a42-8da5-35a0b3216072",
+ "parentJobNamespace": "prod_biz",
+ "parentJobName": "get_files",
+ "rootParentRunId":
"9d3b14f7-de91-40b6-aeef-e887e2c7673e",
+ "rootParentJobNamespace": "prod_analytics",
+ "rootParentJobName":
"generate_report_sales_e2e"
+ }
+ },
+ "dag_id":
"openlineage_trigger_failed_dag_child__notrigger",
+ "data_interval_end": "{{ is_datetime(result) }}",
+ "data_interval_start": "{{ is_datetime(result) }}",
+ "logical_date": "{{ is_datetime(result) }}",
+ "run_id": "{{
result.startswith('openlineage_trigger_failed_dag_triggering_child') }}",
+ "run_type": "operator_triggered",
+ "start_date": "{{ is_datetime(result) }}"
+ },
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/RunFacet\")
}}"
+ },
+ "parent": {
+ "job": {
+ "namespace": "prod_biz",
+ "name": "get_files"
+ },
+ "run": {
+ "runId": "3bb703d1-09c1-4a42-8da5-35a0b3216072"
+ },
+ "root": {
+ "job": {
+ "name": "generate_report_sales_e2e",
+ "namespace": "prod_analytics"
+ },
+ "run": {
+ "runId": "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
+ }
+ },
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\")
}}"
+ },
+ "nominalTime": {
+ "nominalEndTime": "{{ is_datetime(result) }}",
+ "nominalStartTime": "{{ is_datetime(result) }}",
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/NominalTimeRunFacet.json\\#\\/\\$defs\\/NominalTimeRunFacet$\")
}}"
+ },
+ "processing_engine": {
+ "name": "Airflow",
+ "openlineageAdapterVersion": "{{ regex_match(result,
\"^[\\d]+\\.[\\d]+\\.[\\d]+.*\") }}",
+ "version": "{{ regex_match(result,
\"^[\\d]+\\.[\\d]+\\.[\\d]+.*\") }}",
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ProcessingEngineRunFacet.json\\#\\/\\$defs\\/ProcessingEngineRunFacet$\")
}}"
+ }
+ }
+ },
+ "job": {
+ "namespace": "{{ result is string }}",
+ "name": "openlineage_trigger_failed_dag_child__notrigger",
+ "facets": {
+ "documentation": {
+ "description": "MD DAG doc",
+ "contentType": "text/markdown",
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/DocumentationJobFacet.json\\#\\/\\$defs\\/DocumentationJobFacet\")
}}"
+ },
+ "jobType": {
+ "integration": "AIRFLOW",
+ "jobType": "DAG",
+ "processingType": "BATCH",
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/JobTypeJobFacet.json\\#\\/\\$defs\\/JobTypeJobFacet\")
}}"
+ },
+ "ownership": {
+ "owners": [
+ {
+ "name": "airflow"
+ }
+ ],
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/OwnershipJobFacet.json\\#\\/\\$defs\\/OwnershipJobFacet\")
}}"
+ },
+ "tags": {
+ "tags": [
+ {
+ "key": "first",
+ "value": "first",
+ "source": "AIRFLOW"
+ },
+ {
+ "key": "second@",
+ "value": "second@",
+ "source": "AIRFLOW"
+ },
+ {
+ "key": "with'quote",
+ "value": "with'quote",
+ "source": "AIRFLOW"
+ },
+ {
+ "key": "z\"e",
+ "value": "z\"e",
+ "source": "AIRFLOW"
+ }
+ ],
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/TagsJobFacet.json\\#\\/\\$defs\\/TagsJobFacet\")
}}"
+ },
+ "airflow": {
+ "taskGroups": {},
+ "taskTree": {},
+ "tasks": {
+ "failing_task": {
+ "downstream_task_ids": [],
+ "emits_ol_events": "{{ result == true }}",
+ "is_setup": false,
+ "is_teardown": false,
+ "operator":
"airflow.providers.standard.operators.bash.BashOperator",
+ "ui_label": "failing_task"
+ }
+ },
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/JobFacet\")
}}"
+ }
+ }
+ }
+ },
+ {
+ "eventType": "FAIL",
+ "eventTime": "{{ is_datetime(result) }}",
+ "producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/RunEvent$\")
}}",
+ "inputs": [],
+ "outputs": [],
+ "run": {
+ "runId": "{{ is_uuid(result) }}",
+ "facets": {
+ "errorMessage": {
+ "message": "task_failure",
+ "programmingLanguage": "python",
+ "stackTrace": null,
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ErrorMessageRunFacet.json\\#\\/\\$defs\\/ErrorMessageRunFacet\")
}}"
+ },
+ "airflowState": {
+ "dagRunState": "failed",
+ "tasksState": {
+ "failing_task": "failed"
+ },
+ "tasksDuration": {
+ "failing_task": "{{ result is number }}"
+ },
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/RunFacet\")
}}"
+ },
+ "airflowDagRun": {
+ "dag": {
+ "dag_id":
"openlineage_trigger_failed_dag_child__notrigger",
+ "fileloc": "{{
result.endswith('openlineage/example_openlineage_trigger_failed_dag.py') }}",
+ "owner": "airflow",
+ "owner_links": {},
+ "start_date": "{{ is_datetime(result) }}",
+ "tags": "{{ result[1:-1].split(', ') | sort ==
['\"with\\'quote\"', \"'first'\", \"'second@'\", '\\'z\"e\\''] }}",
+ "timetable": {}
+ },
+ "dagRun": {
+ "conf": {
+ "some_config": "value1",
+ "openlineage": {
+ "parentRunId":
"3bb703d1-09c1-4a42-8da5-35a0b3216072",
+ "parentJobNamespace": "prod_biz",
+ "parentJobName": "get_files",
+ "rootParentRunId":
"9d3b14f7-de91-40b6-aeef-e887e2c7673e",
+ "rootParentJobNamespace": "prod_analytics",
+ "rootParentJobName":
"generate_report_sales_e2e"
+ }
+ },
+ "dag_id":
"openlineage_trigger_failed_dag_child__notrigger",
+ "data_interval_end": "{{ is_datetime(result) }}",
+ "data_interval_start": "{{ is_datetime(result) }}",
+ "logical_date": "{{ is_datetime(result) }}",
+ "run_id": "{{
result.startswith('openlineage_trigger_failed_dag_triggering_child') }}",
+ "run_type": "operator_triggered",
+ "start_date": "{{ is_datetime(result) }}",
+ "duration": "{{ result is number }}"
+ },
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/RunFacet\")
}}"
+ },
+ "parent": {
+ "job": {
+ "namespace": "prod_biz",
+ "name": "get_files"
+ },
+ "run": {
+ "runId": "3bb703d1-09c1-4a42-8da5-35a0b3216072"
+ },
+ "root": {
+ "job": {
+ "name": "generate_report_sales_e2e",
+ "namespace": "prod_analytics"
+ },
+ "run": {
+ "runId": "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
+ }
+ },
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\")
}}"
+ },
+ "nominalTime": {
+ "nominalEndTime": "{{ is_datetime(result) }}",
+ "nominalStartTime": "{{ is_datetime(result) }}",
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/NominalTimeRunFacet.json\\#\\/\\$defs\\/NominalTimeRunFacet$\")
}}"
+ },
+ "processing_engine": {
+ "name": "Airflow",
+ "openlineageAdapterVersion": "{{ regex_match(result,
\"^[\\d]+\\.[\\d]+\\.[\\d]+.*\") }}",
+ "version": "{{ regex_match(result,
\"^[\\d]+\\.[\\d]+\\.[\\d]+.*\") }}",
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ProcessingEngineRunFacet.json\\#\\/\\$defs\\/ProcessingEngineRunFacet$\")
}}"
+ }
+ }
+ },
+ "job": {
+ "namespace": "{{ result is string }}",
+ "name": "openlineage_trigger_failed_dag_child__notrigger",
+ "facets": {
+ "documentation": {
+ "description": "MD DAG doc",
+ "contentType": "text/markdown",
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/DocumentationJobFacet.json\\#\\/\\$defs\\/DocumentationJobFacet\")
}}"
+ },
+ "ownership": {
+ "owners": [
+ {
+ "name": "airflow"
+ }
+ ],
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/OwnershipJobFacet.json\\#\\/\\$defs\\/OwnershipJobFacet\")
}}"
+ },
+ "tags": {
+ "tags": [
+ {
+ "key": "first",
+ "value": "first",
+ "source": "AIRFLOW"
+ },
+ {
+ "key": "second@",
+ "value": "second@",
+ "source": "AIRFLOW"
+ },
+ {
+ "key": "with'quote",
+ "value": "with'quote",
+ "source": "AIRFLOW"
+ },
+ {
+ "key": "z\"e",
+ "value": "z\"e",
+ "source": "AIRFLOW"
+ }
+ ],
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/TagsJobFacet.json\\#\\/\\$defs\\/TagsJobFacet\")
}}"
+ },
+ "jobType": {
+ "integration": "AIRFLOW",
+ "jobType": "DAG",
+ "processingType": "BATCH",
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/JobTypeJobFacet.json\\#\\/\\$defs\\/JobTypeJobFacet\")
}}"
+ }
+ }
+ }
+ },
+ {
+ "eventType": "START",
+ "run": {
+ "facets": {
+ "parent": {
+ "job": {
+ "namespace": "{{ result is string }}",
+ "name":
"openlineage_trigger_failed_dag_child__notrigger"
+ },
+ "run": {
+ "runId": "{{ is_uuid(result) }}"
+ },
+ "root": {
+ "job": {
+ "name": "generate_report_sales_e2e",
+ "namespace": "prod_analytics"
+ },
+ "run": {
+ "runId": "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
+ }
+ },
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\")
}}"
+ }
+ }
+ },
+ "job": {
+ "name":
"openlineage_trigger_failed_dag_child__notrigger.failing_task"
+ }
+ },
+ {
+ "eventType": "FAIL",
+ "run": {
+ "facets": {
+ "errorMessage": {
+ "message": "Bash command failed. The command returned a
non-zero exit code 1.",
+ "programmingLanguage": "python",
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ErrorMessageRunFacet.json\\#\\/\\$defs\\/ErrorMessageRunFacet\")
}}"
+ },
+ "parent": {
+ "job": {
+ "namespace": "{{ result is string }}",
+ "name":
"openlineage_trigger_failed_dag_child__notrigger"
+ },
+ "run": {
+ "runId": "{{ is_uuid(result) }}"
+ },
+ "root": {
+ "job": {
+ "name": "generate_report_sales_e2e",
+ "namespace": "prod_analytics"
+ },
+ "run": {
+ "runId": "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
+ }
+ },
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\")
}}"
+ }
+ }
+ },
+ "job": {
+ "name":
"openlineage_trigger_failed_dag_child__notrigger.failing_task"
+ }
+ },
+ {
+ "eventType": "START",
+ "run": {
+ "facets": {
+ "airflow": {
+ "task": {
+ "trigger_dag_id":
"openlineage_trigger_failed_dag_child__notrigger",
+ "trigger_run_id": "{{
result.startswith('openlineage_trigger_failed_dag_triggering_child_202') }}"
+ }
+ },
+ "parent": {
+ "job": {
+ "namespace": "{{ result is string }}",
+ "name": "openlineage_trigger_failed_dag"
+ },
+ "run": {
+ "runId": "{{ is_uuid(result) }}"
+ },
+ "root": {
+ "job": {
+ "name": "openlineage_trigger_failed_dag",
+ "namespace": "{{ result is string }}"
+ },
+ "run": {
+ "runId": "{{ is_uuid(result) }}"
+ }
+ },
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\")
}}"
+ }
+ }
+ },
+ "job": {
+ "name": "openlineage_trigger_failed_dag.trigger_dagrun"
+ }
+ },
+ {
+ "eventType": "FAIL",
+ "run": {
+ "facets": {
+ "airflow": {
+ "task": {
+ "trigger_dag_id":
"openlineage_trigger_failed_dag_child__notrigger",
+ "trigger_run_id": "{{
result.startswith('openlineage_trigger_failed_dag_triggering_child_202') }}"
+ }
+ },
+ "parent": {
+ "job": {
+ "namespace": "{{ result is string }}",
+ "name": "openlineage_trigger_failed_dag"
+ },
+ "run": {
+ "runId": "{{ is_uuid(result) }}"
+ },
+ "root": {
+ "job": {
+ "name": "openlineage_trigger_failed_dag",
+ "namespace": "{{ result is string }}"
+ },
+ "run": {
+ "runId": "{{ is_uuid(result) }}"
+ }
+ },
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\")
}}"
+ }
+ }
+ },
+ "job": {
+ "name": "openlineage_trigger_failed_dag.trigger_dagrun"
+ }
+ }
+]
diff --git a/providers/openlineage/tests/system/openlineage/operator.py
b/providers/openlineage/tests/system/openlineage/operator.py
index a122a38f83d..802ac24320d 100644
--- a/providers/openlineage/tests/system/openlineage/operator.py
+++ b/providers/openlineage/tests/system/openlineage/operator.py
@@ -28,7 +28,7 @@ from typing import TYPE_CHECKING, Any
from urllib.parse import urlparse
from dateutil.parser import parse
-from jinja2 import Environment
+from jinja2 import Environment, TemplateError
from airflow.providers.common.compat.sdk import BaseOperator, Variable
@@ -196,6 +196,9 @@ def match(expected, result, env: Environment, path: list |
None = None) -> bool:
elif isinstance(expected, list):
# Lists must match exactly in length and order. Each element is
compared recursively,
# so nested sentinels (null, $optional) work inside list items too.
+ if not isinstance(result, list):
+ log.error("Path `%s`: expected a list but got `%s` (%s)",
path_str, result, type(result).__name__)
+ return False
if len(expected) != len(result):
log.error("Path `%s`: expected %d item(s) but got %d", path_str,
len(expected), len(result))
return False
@@ -210,7 +213,7 @@ def match(expected, result, env: Environment, path: list |
None = None) -> bool:
# (for expressions that transform the value, e.g. filters).
try:
rendered = env.from_string(expected).render(result=result)
- except ValueError as e:
+ except (ValueError, TemplateError) as e:
log.error("Path `%s`: failed to render template `%s`: %s",
path_str, expected, e)
return False
if str(rendered).lower() == "true" or rendered == result:
@@ -323,6 +326,8 @@ class OpenLineageTestOperator(BaseOperator):
self.clear_variables = clear_variables
self.fail_fast = fail_fast
self.event_sort_fn = event_sort_fn
+ if event_templates is None and file_path is None:
+ raise ValueError("Either event_templates or file_path must be
provided")
if self.event_templates and self.file_path:
raise ValueError("Can't pass both event_templates and file_path")
@@ -374,7 +379,12 @@ class OpenLineageTestOperator(BaseOperator):
log.info("Key `%s` absent as expected (%s).", key, reason)
return
- actual_events = Variable.get(key=key, deserialize_json=True)
+ try:
+ actual_events = Variable.get(key=key, deserialize_json=True)
+ except NoVariableError:
+ raise ValueError(
+ f"Expected events for key `{key}` but variable does not exist
(no events emitted)"
+ )
if not isinstance(actual_events, list):
raise ValueError(
f"Variable {key} does not contain a list of events, got
{type(actual_events).__name__}"
@@ -396,7 +406,7 @@ class OpenLineageTestOperator(BaseOperator):
if isinstance(template, list):
# Multiple expected events: compare each one after sorting.
for i, (tmpl, evt_str) in enumerate(zip(template, actual_events)):
- if not match(tmpl, json.loads(evt_str), self.env):
+ if not match(tmpl, json.loads(evt_str), self.env, [i]):
raise ValueError(f"Event at index {i} does not match
template for key `{key}`")
else:
# Last event is checked against the template