kacpermuda commented on code in PR #66849:
URL: https://github.com/apache/airflow/pull/66849#discussion_r3234781625


##########
providers/common/sql/src/airflow/providers/common/sql/operators/sql.py:
##########
@@ -300,14 +344,109 @@ def get_openlineage_facets_on_complete(self, 
task_instance) -> OperatorLineage |
             database_specific_lineage = None
 
         if database_specific_lineage is None:
-            return operator_lineage
+            if not self._check_results:
+                return operator_lineage
+            try:
+                return self._attach_check_facets(operator_lineage)
+            except AirflowOptionalProviderFeatureException as err:
+                self.log.debug("OpenLineage could not attach check facets: 
%s", err)
+                return operator_lineage
 
-        return OperatorLineage(
+        merged = OperatorLineage(
             inputs=operator_lineage.inputs + database_specific_lineage.inputs,
             outputs=operator_lineage.outputs + 
database_specific_lineage.outputs,
             run_facets=merge_dicts(operator_lineage.run_facets, 
database_specific_lineage.run_facets),
             job_facets=merge_dicts(operator_lineage.job_facets, 
database_specific_lineage.job_facets),
         )
+        if not self._check_results:
+            return merged
+        try:
+            return self._attach_check_facets(merged)
+        except AirflowOptionalProviderFeatureException as err:
+            self.log.debug("OpenLineage could not attach check facets: %s", 
err)
+            return merged
+
+    @require_openlineage_version(client_min_version="1.47.0")
+    def _attach_check_facets(self, operator_lineage: OperatorLineage) -> 
OperatorLineage:
+        """
+        Attach OpenLineage check-result facets to the given lineage object.
+
+        Requires openlineage-python >= 1.47.0, which introduced the extended 
``Assertion``
+        and ``TestExecution`` schemas (``name``, ``description``, 
``expected``, ``actual``, ``content``,
+        ``params`` fields).  The decorator raises 
``AirflowOptionalProviderFeatureException`` when
+        the client is absent or too old; callers are expected to catch that 
exception.
+
+        Results with a ``table`` set are attached 
as``DataQualityAssertionsDatasetFacet`` on the matching
+        dataset (matched by suffix).  Unmatched results, and results without a 
table, fall back
+        to a run-level ``TestRunFacet``.
+        """
+        from openlineage.client.facet_v2 import 
data_quality_assertions_dataset, test_run
+
+        by_table: dict[str | None, list[SQLCheckResult]] = {}
+        for r in self._check_results:
+            by_table.setdefault(r.table, []).append(r)
+
+        run_level: list[SQLCheckResult] = list(by_table.pop(None, []))
+
+        for table, results in by_table.items():
+            assertions = [
+                data_quality_assertions_dataset.Assertion(
+                    assertion=r.check_type,
+                    success=r.success,
+                    severity=r.severity,
+                    column=r.column,
+                    name=r.name,
+                    description=r.description,
+                    expected=r.expected,
+                    actual=r.actual,
+                    content=r.content,
+                    contentType="sql",
+                    params=r.params,
+                )
+                for r in results
+            ]
+            matched = False
+            for dataset in [*operator_lineage.inputs, 
*operator_lineage.outputs]:

Review Comment:
   Not sure if this scenario is possible in any of the supported operators 
currently (most check operators just check, never write to the table, but the 
OL implementation here can be future proofed, so it's definitely a good idea to 
think about it)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to