This is an automated email from the ASF dual-hosted git repository.
shahar1 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a3a2b740c18 Support all bq load job and ext table config options in
GCSToBigQueryOperator (#64505)
a3a2b740c18 is described below
commit a3a2b740c18a825dd7e0f7000976de6f7991aa74
Author: Miriam Lauter <[email protected]>
AuthorDate: Tue May 12 00:43:00 2026 -0400
Support all bq load job and ext table config options in
GCSToBigQueryOperator (#64505)
---
.../google/cloud/transfers/gcs_to_bigquery.py | 43 +++-
.../google/cloud/transfers/test_gcs_to_bigquery.py | 259 ++++++++++++++++++---
2 files changed, 270 insertions(+), 32 deletions(-)
diff --git
a/providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
b/providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
index ffc88e77e21..cee73e5863b 100644
---
a/providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
+++
b/providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
@@ -20,6 +20,7 @@
from __future__ import annotations
import json
+import warnings
from collections.abc import Sequence
from typing import TYPE_CHECKING, Any
@@ -31,11 +32,11 @@ from google.cloud.bigquery import (
ExtractJob,
LoadJob,
QueryJob,
- SchemaField,
UnknownJob,
)
from google.cloud.bigquery.table import EncryptionConfiguration, Table,
TableReference
+from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.providers.common.compat.sdk import AirflowException, conf
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook,
BigQueryJob
from airflow.providers.google.cloud.hooks.gcs import GCSHook
@@ -136,7 +137,18 @@ class GCSToBigQueryOperator(BaseOperator):
future executions, you can pick up from the max ID.
:param schema_update_options: Allows the schema of the destination
table to be updated as a side effect of the load job.
- :param src_fmt_configs: configure optional fields specific to the source
format
+ :param src_fmt_configs: (Deprecated) configure optional fields specific to
the source format.
+ Use ``extra_config`` instead. Note when migrating that
``extra_config`` uses the fully-nested API
+ structure, so format-specific options must be nested under their
parent key
+ (e.g., ``{"parquetOptions": {"enableListInference": True}}`` rather
than
+ ``{"enableListInference": True}``).
+ :param extra_config: Dict of additional properties to apply over the
BigQuery job configuration.
+ When ``external_table=False``, applied over the load job configuration
+ (see `JobConfigurationLoad
<https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad>`_).
+ When ``external_table=True``, applied over the external table
configuration
+ (see `ExternalDataConfiguration
<https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#ExternalDataConfiguration>`_).
+ Applied after all top-level params, so keys here take precedence over
overlapping top-level
+ operator params. Nested dicts are replaced entirely, not deep-merged.
:param external_table: Flag to specify if the destination table should be
a BigQuery external table. Default Value is False.
:param time_partitioning: configure optional time partitioning fields i.e.
@@ -189,6 +201,7 @@ class GCSToBigQueryOperator(BaseOperator):
"destination_project_dataset_table",
"impersonation_chain",
"src_fmt_configs",
+ "extra_config",
)
template_ext: Sequence[str] = (".sql",)
ui_color = "#f0eee4"
@@ -219,6 +232,7 @@ class GCSToBigQueryOperator(BaseOperator):
gcp_conn_id="google_cloud_default",
schema_update_options=(),
src_fmt_configs=None,
+ extra_config: dict | None = None,
external_table=False,
time_partitioning=None,
range_partitioning=None,
@@ -289,6 +303,17 @@ class GCSToBigQueryOperator(BaseOperator):
self.schema_update_options = schema_update_options
self.src_fmt_configs = src_fmt_configs
+ if src_fmt_configs:
+ warnings.warn(
+ "The 'src_fmt_configs' parameter is deprecated. Use
'extra_config' instead. "
+ "Note: 'extra_config' uses the fully-nested API structure, so
format-specific "
+ "options must be nested under their parent key "
+ "(e.g., {'parquetOptions': {'enableListInference': True}}
rather than "
+ "{'enableListInference': True}).",
+ AirflowProviderDeprecationWarning,
+ stacklevel=2,
+ )
+ self.extra_config = extra_config
self.time_partitioning = time_partitioning
self.range_partitioning = range_partitioning
self.cluster_fields = cluster_fields
@@ -570,11 +595,15 @@ class GCSToBigQueryOperator(BaseOperator):
)
external_config_api_repr[src_fmt_to_param_mapping[self.source_format]] =
self.src_fmt_configs
- external_config =
ExternalConfig.from_api_repr(external_config_api_repr)
if self.schema_fields:
- external_config.schema = [SchemaField.from_api_repr(f) for f in
self.schema_fields]
+ external_config_api_repr["schema"] = {"fields": self.schema_fields}
if self.max_bad_records:
- external_config.max_bad_records = self.max_bad_records
+ external_config_api_repr["maxBadRecords"] = self.max_bad_records
+
+ if self.extra_config:
+ external_config_api_repr.update(self.extra_config)
+
+ external_config =
ExternalConfig.from_api_repr(external_config_api_repr)
# build table definition
table = Table(
@@ -728,6 +757,10 @@ class GCSToBigQueryOperator(BaseOperator):
if self.allow_jagged_rows:
self.configuration["load"]["allowJaggedRows"] =
self.allow_jagged_rows
+
+ if self.extra_config:
+ self.configuration["load"].update(self.extra_config)
+
return self.configuration
def _validate_src_fmt_configs(
diff --git
a/providers/google/tests/unit/google/cloud/transfers/test_gcs_to_bigquery.py
b/providers/google/tests/unit/google/cloud/transfers/test_gcs_to_bigquery.py
index 8d059692787..be2f5ed52e5 100644
--- a/providers/google/tests/unit/google/cloud/transfers/test_gcs_to_bigquery.py
+++ b/providers/google/tests/unit/google/cloud/transfers/test_gcs_to_bigquery.py
@@ -27,6 +27,7 @@ from google.cloud.bigquery import DEFAULT_RETRY, Table
from google.cloud.exceptions import Conflict
from sqlalchemy import select
+from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.models.trigger import Trigger
from airflow.providers.common.compat.openlineage.facet import (
ColumnLineageDatasetFacet,
@@ -1739,20 +1740,21 @@ class TestGCSToBigQueryOperator:
hook.return_value.generate_job_id.return_value = REAL_JOB_ID
hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET,
TABLE)
- operator = GCSToBigQueryOperator(
- task_id=TASK_ID,
- bucket=TEST_BUCKET,
- source_objects=TEST_SOURCE_OBJECTS,
- destination_project_dataset_table=TEST_EXPLICIT_DEST,
- schema_fields=SCHEMA_FIELDS,
- write_disposition=WRITE_DISPOSITION,
- external_table=True,
- project_id=JOB_PROJECT_ID,
- source_format="PARQUET",
- src_fmt_configs={
- "enableListInference": True,
- },
- )
+ with pytest.warns(AirflowProviderDeprecationWarning,
match="src_fmt_configs"):
+ operator = GCSToBigQueryOperator(
+ task_id=TASK_ID,
+ bucket=TEST_BUCKET,
+ source_objects=TEST_SOURCE_OBJECTS,
+ destination_project_dataset_table=TEST_EXPLICIT_DEST,
+ schema_fields=SCHEMA_FIELDS,
+ write_disposition=WRITE_DISPOSITION,
+ external_table=True,
+ project_id=JOB_PROJECT_ID,
+ source_format="PARQUET",
+ src_fmt_configs={
+ "enableListInference": True,
+ },
+ )
operator.execute(context=MagicMock())
@@ -1841,19 +1843,20 @@ class TestGCSToBigQueryOperator:
]
hook.return_value.generate_job_id.return_value = REAL_JOB_ID
hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET,
TABLE)
- operator = GCSToBigQueryOperator(
- task_id=TASK_ID,
- bucket=TEST_BUCKET,
- source_objects=TEST_SOURCE_OBJECTS,
- write_disposition=WRITE_DISPOSITION,
- destination_project_dataset_table=TEST_EXPLICIT_DEST,
- external_table=False,
- project_id=JOB_PROJECT_ID,
- source_format="PARQUET",
- src_fmt_configs={
- "enableListInference": True,
- },
- )
+ with pytest.warns(AirflowProviderDeprecationWarning,
match="src_fmt_configs"):
+ operator = GCSToBigQueryOperator(
+ task_id=TASK_ID,
+ bucket=TEST_BUCKET,
+ source_objects=TEST_SOURCE_OBJECTS,
+ write_disposition=WRITE_DISPOSITION,
+ destination_project_dataset_table=TEST_EXPLICIT_DEST,
+ external_table=False,
+ project_id=JOB_PROJECT_ID,
+ source_format="PARQUET",
+ src_fmt_configs={
+ "enableListInference": True,
+ },
+ )
operator.execute(context=MagicMock())
@@ -1888,6 +1891,208 @@ class TestGCSToBigQueryOperator:
hook.return_value.insert_job.assert_has_calls(calls)
+ @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+ def test_extra_config_is_merged_into_load_config(self, hook):
+ hook.return_value.insert_job.return_value =
MagicMock(job_id=REAL_JOB_ID, error_result=False)
+ hook.return_value.generate_job_id.return_value = REAL_JOB_ID
+ hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET,
TABLE)
+
+ operator = GCSToBigQueryOperator(
+ task_id=TASK_ID,
+ bucket=TEST_BUCKET,
+ source_objects=TEST_SOURCE_OBJECTS,
+ destination_project_dataset_table=TEST_EXPLICIT_DEST,
+ write_disposition=WRITE_DISPOSITION,
+ project_id=JOB_PROJECT_ID,
+ extra_config={"columnNameCharacterMap": "STRICT"},
+ )
+
+ operator.execute(context=MagicMock())
+
+ config = hook.return_value.insert_job.call_args[1]["configuration"]
+ assert config["load"]["columnNameCharacterMap"] == "STRICT"
+
+ @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+ def test_extra_config_takes_precedence_over_top_level_params(self, hook):
+ hook.return_value.insert_job.return_value =
MagicMock(job_id=REAL_JOB_ID, error_result=False)
+ hook.return_value.generate_job_id.return_value = REAL_JOB_ID
+ hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET,
TABLE)
+
+ operator = GCSToBigQueryOperator(
+ task_id=TASK_ID,
+ bucket=TEST_BUCKET,
+ source_objects=TEST_SOURCE_OBJECTS,
+ destination_project_dataset_table=TEST_EXPLICIT_DEST,
+ write_disposition=WRITE_DISPOSITION,
+ project_id=JOB_PROJECT_ID,
+ autodetect=True,
+ extra_config={"autodetect": False},
+ )
+
+ operator.execute(context=MagicMock())
+
+ config = hook.return_value.insert_job.call_args[1]["configuration"]
+ assert config["load"]["autodetect"] is False
+
+ @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+ def test_extra_config_with_nested_format_options(self, hook):
+ hook.return_value.insert_job.return_value =
MagicMock(job_id=REAL_JOB_ID, error_result=False)
+ hook.return_value.generate_job_id.return_value = REAL_JOB_ID
+ hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET,
TABLE)
+
+ operator = GCSToBigQueryOperator(
+ task_id=TASK_ID,
+ bucket=TEST_BUCKET,
+ source_objects=TEST_SOURCE_OBJECTS,
+ destination_project_dataset_table=TEST_EXPLICIT_DEST,
+ write_disposition=WRITE_DISPOSITION,
+ project_id=JOB_PROJECT_ID,
+ source_format="PARQUET",
+ extra_config={"parquetOptions": {"enableListInference": True}},
+ )
+
+ operator.execute(context=MagicMock())
+
+ config = hook.return_value.insert_job.call_args[1]["configuration"]
+ assert config["load"]["parquetOptions"] == {"enableListInference":
True}
+
+ @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+ def test_extra_config_is_merged_into_external_config(self, hook):
+ hook.return_value.generate_job_id.return_value = REAL_JOB_ID
+ hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET,
TABLE)
+
+ operator = GCSToBigQueryOperator(
+ task_id=TASK_ID,
+ bucket=TEST_BUCKET,
+ source_objects=TEST_SOURCE_OBJECTS,
+ destination_project_dataset_table=TEST_EXPLICIT_DEST,
+ schema_fields=SCHEMA_FIELDS,
+ write_disposition=WRITE_DISPOSITION,
+ external_table=True,
+ project_id=JOB_PROJECT_ID,
+ extra_config={"maxBadRecords": 10},
+ )
+
+ operator.execute(context=MagicMock())
+
+ table_resource =
hook.return_value.create_table.call_args[1]["table_resource"]
+ assert table_resource["externalDataConfiguration"]["maxBadRecords"] ==
10
+
+ @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+ def test_max_bad_records_set_from_top_level_param_in_external_table(self,
hook):
+ hook.return_value.generate_job_id.return_value = REAL_JOB_ID
+ hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET,
TABLE)
+
+ operator = GCSToBigQueryOperator(
+ task_id=TASK_ID,
+ bucket=TEST_BUCKET,
+ source_objects=TEST_SOURCE_OBJECTS,
+ destination_project_dataset_table=TEST_EXPLICIT_DEST,
+ schema_fields=SCHEMA_FIELDS,
+ write_disposition=WRITE_DISPOSITION,
+ external_table=True,
+ project_id=JOB_PROJECT_ID,
+ max_bad_records=5,
+ )
+
+ operator.execute(context=MagicMock())
+
+ table_resource =
hook.return_value.create_table.call_args[1]["table_resource"]
+ assert table_resource["externalDataConfiguration"]["maxBadRecords"] ==
5
+
+ @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+ def
test_extra_config_takes_precedence_over_max_bad_records_in_external_table(self,
hook):
+ hook.return_value.generate_job_id.return_value = REAL_JOB_ID
+ hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET,
TABLE)
+
+ operator = GCSToBigQueryOperator(
+ task_id=TASK_ID,
+ bucket=TEST_BUCKET,
+ source_objects=TEST_SOURCE_OBJECTS,
+ destination_project_dataset_table=TEST_EXPLICIT_DEST,
+ schema_fields=SCHEMA_FIELDS,
+ write_disposition=WRITE_DISPOSITION,
+ external_table=True,
+ project_id=JOB_PROJECT_ID,
+ max_bad_records=5,
+ extra_config={"maxBadRecords": 10},
+ )
+
+ operator.execute(context=MagicMock())
+
+ table_resource =
hook.return_value.create_table.call_args[1]["table_resource"]
+ assert table_resource["externalDataConfiguration"]["maxBadRecords"] ==
10
+
+ @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+ def
test_extra_config_takes_precedence_over_schema_fields_in_external_table(self,
hook):
+ hook.return_value.generate_job_id.return_value = REAL_JOB_ID
+ hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET,
TABLE)
+
+ override_schema = [{"name": "override_col", "type": "INTEGER", "mode":
"NULLABLE"}]
+ operator = GCSToBigQueryOperator(
+ task_id=TASK_ID,
+ bucket=TEST_BUCKET,
+ source_objects=TEST_SOURCE_OBJECTS,
+ destination_project_dataset_table=TEST_EXPLICIT_DEST,
+ schema_fields=SCHEMA_FIELDS,
+ write_disposition=WRITE_DISPOSITION,
+ external_table=True,
+ project_id=JOB_PROJECT_ID,
+ extra_config={"schema": {"fields": override_schema}},
+ )
+
+ operator.execute(context=MagicMock())
+
+ table_resource =
hook.return_value.create_table.call_args[1]["table_resource"]
+ assert table_resource["externalDataConfiguration"]["schema"]["fields"]
== override_schema
+
+ @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+ def test_src_fmt_configs_emits_deprecation_warning(self, hook):
+ hook.return_value.insert_job.return_value =
MagicMock(job_id=REAL_JOB_ID, error_result=False)
+ hook.return_value.generate_job_id.return_value = REAL_JOB_ID
+ hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET,
TABLE)
+
+ with pytest.warns(AirflowProviderDeprecationWarning,
match="src_fmt_configs"):
+ operator = GCSToBigQueryOperator(
+ task_id=TASK_ID,
+ bucket=TEST_BUCKET,
+ source_objects=TEST_SOURCE_OBJECTS,
+ destination_project_dataset_table=TEST_EXPLICIT_DEST,
+ write_disposition=WRITE_DISPOSITION,
+ project_id=JOB_PROJECT_ID,
+ src_fmt_configs={"skipLeadingRows": 1},
+ )
+
+ operator.execute(context=MagicMock())
+
+ config = hook.return_value.insert_job.call_args[1]["configuration"]
+ assert config["load"]["skipLeadingRows"] == 1
+
+ @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+ def
test_src_fmt_configs_and_extra_config_both_applied_with_precedence(self, hook):
+ hook.return_value.insert_job.return_value =
MagicMock(job_id=REAL_JOB_ID, error_result=False)
+ hook.return_value.generate_job_id.return_value = REAL_JOB_ID
+ hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET,
TABLE)
+
+ with pytest.warns(AirflowProviderDeprecationWarning,
match="src_fmt_configs"):
+ operator = GCSToBigQueryOperator(
+ task_id=TASK_ID,
+ bucket=TEST_BUCKET,
+ source_objects=TEST_SOURCE_OBJECTS,
+ destination_project_dataset_table=TEST_EXPLICIT_DEST,
+ write_disposition=WRITE_DISPOSITION,
+ project_id=JOB_PROJECT_ID,
+ src_fmt_configs={"skipLeadingRows": 1},
+ extra_config={"skipLeadingRows": 5, "columnNameCharacterMap":
"STRICT"},
+ )
+
+ operator.execute(context=MagicMock())
+
+ config = hook.return_value.insert_job.call_args[1]["configuration"]
+ # extra_config wins for overlapping key
+ assert config["load"]["skipLeadingRows"] == 5
+ assert config["load"]["columnNameCharacterMap"] == "STRICT"
+
@pytest.fixture
def create_task_instance(create_task_instance_of_operator, session):