This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 7f1d54ad4b1 chore: remove deprecated bigquery facets from OpenLineage utils (#44838) 7f1d54ad4b1 is described below commit 7f1d54ad4b14a4b255b20dfb162ee3667341d6f6 Author: Kacper Muda <mudakac...@gmail.com> AuthorDate: Wed Dec 11 14:33:50 2024 +0100 chore: remove deprecated bigquery facets from OpenLineage utils (#44838) Signed-off-by: Kacper Muda <mudakac...@gmail.com> --- .../cloud/openlineage/BigQueryErrorRunFacet.json | 30 ---------------------- .../providers/google/cloud/openlineage/mixins.py | 14 +++------- .../providers/google/cloud/openlineage/utils.py | 22 ---------------- .../tests/google/cloud/openlineage/test_mixins.py | 6 ----- .../tests/google/cloud/operators/test_bigquery.py | 1 - 5 files changed, 3 insertions(+), 70 deletions(-) diff --git a/providers/src/airflow/providers/google/cloud/openlineage/BigQueryErrorRunFacet.json b/providers/src/airflow/providers/google/cloud/openlineage/BigQueryErrorRunFacet.json deleted file mode 100644 index 3213f9b8b2d..00000000000 --- a/providers/src/airflow/providers/google/cloud/openlineage/BigQueryErrorRunFacet.json +++ /dev/null @@ -1,30 +0,0 @@ -{ - "$schema": "https://json-schema.org/draft/2020-12/schema", - "$defs": { - "BigQueryErrorRunFacet": { - "allOf": [ - { - "$ref": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet" - }, - { - "type": "object", - "properties": { - "clientError": { - "type": "string" - }, - "parserError": { - "type": "string" - } - } - } - ], - "type": "object" - } - }, - "type": "object", - "properties": { - "bigQuery_error": { - "$ref": "#/$defs/BigQueryErrorRunFacet" - } - } - } diff --git a/providers/src/airflow/providers/google/cloud/openlineage/mixins.py b/providers/src/airflow/providers/google/cloud/openlineage/mixins.py index df8a0875f8c..ce7a14e03ae 100644 --- a/providers/src/airflow/providers/google/cloud/openlineage/mixins.py +++ b/providers/src/airflow/providers/google/cloud/openlineage/mixins.py @@ -108,10 +108,7 @@ class _BigQueryOpenLineageMixin: def get_facets(self, job_id: str): from airflow.providers.common.compat.openlineage.facet import ErrorMessageRunFacet - from airflow.providers.google.cloud.openlineage.utils import ( - BigQueryErrorRunFacet, - get_from_nullable_chain, - ) + from airflow.providers.google.cloud.openlineage.utils import get_from_nullable_chain inputs = [] outputs = [] @@ -125,8 +122,7 @@ class _BigQueryOpenLineageMixin: if get_from_nullable_chain(props, ["status", "state"]) != "DONE": raise ValueError(f"Trying to extract data from running bigquery job: `{job_id}`") - # TODO: remove bigQuery_job in next release - run_facets["bigQuery_job"] = run_facets["bigQueryJob"] = self._get_bigquery_job_run_facet(props) + run_facets["bigQueryJob"] = self._get_bigquery_job_run_facet(props) if get_from_nullable_chain(props, ["statistics", "numChildJobs"]): if hasattr(self, "log"): @@ -145,16 +141,12 @@ class _BigQueryOpenLineageMixin: if hasattr(self, "log"): self.log.warning("Cannot retrieve job details from BigQuery.Client. %s", e, exc_info=True) exception_msg = traceback.format_exc() - # TODO: remove BigQueryErrorRunFacet in next release run_facets.update( { "errorMessage": ErrorMessageRunFacet( message=f"{e}: {exception_msg}", programmingLanguage="python", - ), - "bigQuery_error": BigQueryErrorRunFacet( - clientError=f"{e}: {exception_msg}", - ), + ) } ) deduplicated_outputs = self._deduplicate_outputs(outputs) diff --git a/providers/src/airflow/providers/google/cloud/openlineage/utils.py b/providers/src/airflow/providers/google/cloud/openlineage/utils.py index a8989b4eb8f..d7852b13ecb 100644 --- a/providers/src/airflow/providers/google/cloud/openlineage/utils.py +++ b/providers/src/airflow/providers/google/cloud/openlineage/utils.py @@ -218,28 +218,6 @@ class BigQueryJobRunFacet(RunFacet): ) -# TODO: remove BigQueryErrorRunFacet in next release -@define -class BigQueryErrorRunFacet(RunFacet): - """ - Represents errors that can happen during execution of BigqueryExtractor. - - :param clientError: represents errors originating in bigquery client - :param parserError: represents errors that happened during parsing SQL provided to bigquery - """ - - clientError: str | None = field(default=None) - parserError: str | None = field(default=None) - - @staticmethod - def _get_schema() -> str: - return ( - "https://raw.githubusercontent.com/apache/airflow/" - f"providers-google/{provider_version}/airflow/providers/google/" - "openlineage/BigQueryErrorRunFacet.json" - ) - - def get_from_nullable_chain(source: Any, chain: list[str]) -> Any | None: """ Get object from nested structure of objects, where it's not guaranteed that all keys in the nested structure exist. diff --git a/providers/tests/google/cloud/openlineage/test_mixins.py b/providers/tests/google/cloud/openlineage/test_mixins.py index 41e4a22ee3f..fb047ddc2d1 100644 --- a/providers/tests/google/cloud/openlineage/test_mixins.py +++ b/providers/tests/google/cloud/openlineage/test_mixins.py @@ -83,9 +83,6 @@ class TestBigQueryOpenLineageMixin: self.job_details["configuration"]["query"].pop("query") assert lineage.run_facets == { - "bigQuery_job": BigQueryJobRunFacet( - cached=False, billedBytes=111149056, properties=json.dumps(self.job_details) - ), "bigQueryJob": BigQueryJobRunFacet( cached=False, billedBytes=111149056, properties=json.dumps(self.job_details) ), @@ -136,9 +133,6 @@ class TestBigQueryOpenLineageMixin: "bigQueryJob": BigQueryJobRunFacet( cached=False, billedBytes=120586240, properties=json.dumps(self.script_job_details) ), - "bigQuery_job": BigQueryJobRunFacet( - cached=False, billedBytes=120586240, properties=json.dumps(self.script_job_details) - ), "externalQuery": ExternalQueryRunFacet(externalQueryId="job_id", source="bigquery"), } assert lineage.inputs == [ diff --git a/providers/tests/google/cloud/operators/test_bigquery.py b/providers/tests/google/cloud/operators/test_bigquery.py index 29f3a8db13e..ab89443a69e 100644 --- a/providers/tests/google/cloud/operators/test_bigquery.py +++ b/providers/tests/google/cloud/operators/test_bigquery.py @@ -1614,7 +1614,6 @@ class TestBigQueryInsertJobOperator: ] assert lineage.run_facets == { - "bigQuery_job": mock.ANY, "bigQueryJob": mock.ANY, "externalQuery": ExternalQueryRunFacet(externalQueryId=mock.ANY, source="bigquery"), }