mobuchowski commented on code in PR #66849:
URL: https://github.com/apache/airflow/pull/66849#discussion_r3235447841
##########
providers/common/sql/src/airflow/providers/common/sql/operators/sql.py:
##########
@@ -300,14 +344,116 @@ def get_openlineage_facets_on_complete(self,
task_instance) -> OperatorLineage |
database_specific_lineage = None
if database_specific_lineage is None:
- return operator_lineage
+ if not self._check_results:
+ return operator_lineage
+ try:
+ return self._attach_check_facets(operator_lineage)
+ except AirflowOptionalProviderFeatureException as err:
+ self.log.debug("OpenLineage could not attach check facets:
%s", err)
+ return operator_lineage
- return OperatorLineage(
+ merged = OperatorLineage(
inputs=operator_lineage.inputs + database_specific_lineage.inputs,
outputs=operator_lineage.outputs +
database_specific_lineage.outputs,
run_facets=merge_dicts(operator_lineage.run_facets,
database_specific_lineage.run_facets),
job_facets=merge_dicts(operator_lineage.job_facets,
database_specific_lineage.job_facets),
)
+ if not self._check_results:
+ return merged
+ try:
+ return self._attach_check_facets(merged)
+ except AirflowOptionalProviderFeatureException as err:
+ self.log.debug("OpenLineage could not attach check facets: %s",
err)
+ return merged
+
+ @require_openlineage_version(client_min_version="1.47.0")
+ def _attach_check_facets(self, operator_lineage: OperatorLineage) ->
OperatorLineage:
+ """
+ Attach OpenLineage check-result facets to the given lineage object.
+
+ Requires openlineage-python >= 1.47.0, which introduced the extended
``Assertion``
+ and ``TestExecution`` schemas (``name``, ``description``,
``expected``, ``actual``, ``content``,
+ ``params`` fields). The decorator raises
``AirflowOptionalProviderFeatureException`` when
+ the client is absent or too old; callers are expected to catch that
exception.
+
+ Results with a ``table`` set are attached
as``DataQualityAssertionsDatasetFacet`` on the matching
+ dataset (matched by suffix). Unmatched results, and results without a
table, fall back
+ to a run-level ``TestRunFacet``.
+ """
+ from openlineage.client.facet_v2 import
data_quality_assertions_dataset, test_run
+
+ by_table: dict[str | None, list[SQLCheckResult]] = {}
+ for r in self._check_results:
+ by_table.setdefault(r.table, []).append(r)
+
+ run_level: list[SQLCheckResult] = list(by_table.pop(None, []))
+
+ for table, results in by_table.items():
+ assertions = [
+ data_quality_assertions_dataset.Assertion(
+ assertion=r.check_type,
+ success=r.success,
+ severity=r.severity,
+ column=r.column,
+ name=r.name,
+ description=r.description,
+ expected=r.expected,
+ actual=r.actual,
+ content=r.content,
+ contentType="sql",
+ params=r.params,
+ )
+ for r in results
+ ]
+ table_lower = table.lower() # type: ignore[union-attr]
+ matched = False
+ for group in (operator_lineage.inputs, operator_lineage.outputs):
+ # Exact match takes priority over suffix match when both are
present in the same group.
+ target = next(
+ (ds for ds in group if ds.name.lower() == table_lower),
+ None,
+ ) or next(
+ (ds for ds in group if
ds.name.lower().endswith(f".{table_lower}")),
+ None,
+ )
+ if target is not None:
+ target.facets = target.facets or {}
+ existing = target.facets.get("dataQualityAssertions")
+ facet_assertions = (
+ existing.assertions + assertions if existing is not
None else assertions
+ )
+ target.facets["dataQualityAssertions"] = (
+
data_quality_assertions_dataset.DataQualityAssertionsDatasetFacet(
+ assertions=facet_assertions
+ )
+ )
+ matched = True
+ if not matched:
+ run_level.extend(results)
+
+ if run_level:
+ tests = [
+ test_run.TestExecution(
+ name=r.name,
+ status="pass" if r.success else "fail",
+ severity=r.severity,
+ type=r.check_type,
+ description=r.description,
+ expected=r.expected,
+ actual=r.actual,
+ content=r.content,
+ contentType="sql",
+ params={"tested_column": r.column, "tested_table":
r.table, **(r.params or {})},
+ )
+ for r in run_level
+ ]
+ operator_lineage.run_facets = operator_lineage.run_facets or {}
+ existing = operator_lineage.run_facets.get("testRunFacet")
+ if existing is not None:
+ tests = existing.tests + tests
+ operator_lineage.run_facets["testRunFacet"] =
test_run.TestRunFacet(tests=tests)
Review Comment:
just `test` - we don't repeat the runfacet part with standard facets - this
happens in few places in this PR
##########
providers/common/sql/src/airflow/providers/common/sql/operators/sql.py:
##########
@@ -554,6 +700,9 @@ def execute(self, context: Context):
self.column_mapping[column][check], result, tolerance
)
+ # Save check results before raising exception, to be used by listeners
+ self._check_results = self._build_check_results()
Review Comment:
Maybe just `self.check_result` if it's not _internal_ - we can make it
explicit if we just add it now
##########
providers/common/sql/src/airflow/providers/common/sql/operators/sql.py:
##########
@@ -127,6 +130,46 @@ def default_output_processor(results: list[Any],
descriptions: list[Sequence[Seq
return results
+@dataclass
+class SQLCheckResult:
+ """Record of a single SQL check result."""
+
+ name: str
+ """Unique name identifying this check."""
+
+ check_type: str
+ """Classification of the check, e.g. ``"not_null"``, ``"row_count"``,
``"unique"``."""
Review Comment:
Do we have row count check somewhere?
Not saying we need to remove it, but I'm curious in context of this
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]