This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 23264fb820 Fix for issue with reading schema fields for JSON files in
GCSToBigQueryOperator (#28284)
23264fb820 is described below
commit 23264fb820c179e9951ea9706f68b13a9b3fdbc0
Author: VladaZakharova <[email protected]>
AuthorDate: Wed Dec 21 20:39:12 2022 +0100
Fix for issue with reading schema fields for JSON files in
GCSToBigQueryOperator (#28284)
---
.../google/cloud/transfers/gcs_to_bigquery.py | 447 +++--
.../providers/google/cloud/triggers/bigquery.py | 2 +
.../google/cloud/transfers/test_gcs_to_bigquery.py | 1719 ++++++++------------
.../cloud/gcs/example_gcs_to_bigquery_async.py | 70 +-
4 files changed, 1026 insertions(+), 1212 deletions(-)
diff --git a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
index 63c625be87..21b3e3d865 100644
--- a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
+++ b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
@@ -21,17 +21,22 @@ from __future__ import annotations
import json
from typing import TYPE_CHECKING, Any, Sequence
-from google.api_core.exceptions import Conflict
+from google.api_core.exceptions import BadRequest, Conflict
from google.api_core.retry import Retry
-from google.cloud.bigquery import DEFAULT_RETRY, CopyJob, ExtractJob, LoadJob,
QueryJob
+from google.cloud.bigquery import (
+ DEFAULT_RETRY,
+ CopyJob,
+ ExternalConfig,
+ ExtractJob,
+ LoadJob,
+ QueryJob,
+ SchemaField,
+)
+from google.cloud.bigquery.table import EncryptionConfiguration, Table,
TableReference
from airflow import AirflowException
from airflow.models import BaseOperator
-from airflow.providers.google.cloud.hooks.bigquery import (
- BigQueryHook,
- BigQueryJob,
- _cleanse_time_partitioning,
-)
+from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook,
BigQueryJob
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.providers.google.cloud.links.bigquery import BigQueryTableLink
from airflow.providers.google.cloud.triggers.bigquery import
BigQueryInsertJobTrigger
@@ -39,6 +44,15 @@ from airflow.providers.google.cloud.triggers.bigquery import
BigQueryInsertJobTr
if TYPE_CHECKING:
from airflow.utils.context import Context
+ALLOWED_FORMATS = [
+ "CSV",
+ "NEWLINE_DELIMITED_JSON",
+ "AVRO",
+ "GOOGLE_SHEETS",
+ "DATASTORE_BACKUP",
+ "PARQUET",
+]
+
class GCSToBigQueryOperator(BaseOperator):
"""
@@ -233,7 +247,13 @@ class GCSToBigQueryOperator(BaseOperator):
# BQ config
self.destination_project_dataset_table =
destination_project_dataset_table
self.schema_fields = schema_fields
- self.source_format = source_format
+ if source_format.upper() not in ALLOWED_FORMATS:
+ raise ValueError(
+ f"{source_format} is not a valid source format. "
+ f"Please use one of the following types: {ALLOWED_FORMATS}."
+ )
+ else:
+ self.source_format = source_format.upper()
self.compression = compression
self.create_disposition = create_disposition
self.skip_leading_rows = skip_leading_rows
@@ -300,6 +320,8 @@ class GCSToBigQueryOperator(BaseOperator):
impersonation_chain=self.impersonation_chain,
)
self.hook = hook
+ self.source_format = self.source_format.upper()
+
job_id = self.hook.generate_job_id(
job_id=self.job_id,
dag_id=self.dag_id,
@@ -312,113 +334,44 @@ class GCSToBigQueryOperator(BaseOperator):
self.source_objects = (
self.source_objects if isinstance(self.source_objects, list) else
[self.source_objects]
)
- source_uris = [f"gs://{self.bucket}/{source_object}" for source_object
in self.source_objects]
+ self.source_uris = [f"gs://{self.bucket}/{source_object}" for
source_object in self.source_objects]
- if not self.schema_fields and self.schema_object and
self.source_format != "DATASTORE_BACKUP":
- gcs_hook = GCSHook(
- gcp_conn_id=self.gcp_conn_id,
- delegate_to=self.delegate_to,
- impersonation_chain=self.impersonation_chain,
- )
- self.schema_fields = json.loads(
- gcs_hook.download(self.schema_object_bucket,
self.schema_object).decode("utf-8")
- )
- self.log.info("Autodetected fields from schema object: %s",
self.schema_fields)
+ if not self.schema_fields:
+ if not self.schema_object and not self.autodetect:
+ raise AirflowException(
+ "Table schema was not found. Neither schema object nor
schema fields were specified"
+ )
+ if self.schema_object and self.source_format != "DATASTORE_BACKUP":
+ gcs_hook = GCSHook(
+ gcp_conn_id=self.gcp_conn_id,
+ delegate_to=self.delegate_to,
+ impersonation_chain=self.impersonation_chain,
+ )
+ self.schema_fields = json.loads(
+ gcs_hook.download(self.schema_object_bucket,
self.schema_object).decode("utf-8")
+ )
+ self.log.info("Loaded fields from schema object: %s",
self.schema_fields)
+ else:
+ self.schema_fields = None
if self.external_table:
self.log.info("Creating a new BigQuery table for storing data...")
- project_id, dataset_id, table_id = self.hook.split_tablename(
- table_input=self.destination_project_dataset_table,
- default_project_id=self.hook.project_id or "",
- )
- table_resource = {
- "tableReference": {
- "projectId": project_id,
- "datasetId": dataset_id,
- "tableId": table_id,
- },
- "labels": self.labels,
- "description": self.description,
- "externalDataConfiguration": {
- "source_uris": source_uris,
- "source_format": self.source_format,
- "maxBadRecords": self.max_bad_records,
- "autodetect": self.autodetect,
- "compression": self.compression,
- "csvOptions": {
- "fieldDelimeter": self.field_delimiter,
- "skipLeadingRows": self.skip_leading_rows,
- "quote": self.quote_character,
- "allowQuotedNewlines": self.allow_quoted_newlines,
- "allowJaggedRows": self.allow_jagged_rows,
- },
- },
- "location": self.location,
- "encryptionConfiguration": self.encryption_configuration,
- }
- table_resource_checked_schema =
self._check_schema_fields(table_resource)
- table = self.hook.create_empty_table(
- table_resource=table_resource_checked_schema,
- )
- max_id = self._find_max_value_in_column()
+ table_obj_api_repr = self._create_empty_table()
+
BigQueryTableLink.persist(
context=context,
task_instance=self,
- dataset_id=table.to_api_repr()["tableReference"]["datasetId"],
- project_id=table.to_api_repr()["tableReference"]["projectId"],
- table_id=table.to_api_repr()["tableReference"]["tableId"],
+ dataset_id=table_obj_api_repr["tableReference"]["datasetId"],
+ project_id=table_obj_api_repr["tableReference"]["projectId"],
+ table_id=table_obj_api_repr["tableReference"]["tableId"],
)
- return max_id
+ if self.max_id_key:
+ max_id = self._find_max_value_in_column()
+ return max_id
else:
self.log.info("Using existing BigQuery table for storing data...")
- destination_project, destination_dataset, destination_table =
self.hook.split_tablename(
- table_input=self.destination_project_dataset_table,
- default_project_id=self.hook.project_id or "",
- var_name="destination_project_dataset_table",
- )
- self.configuration = {
- "load": {
- "autodetect": self.autodetect,
- "createDisposition": self.create_disposition,
- "destinationTable": {
- "projectId": destination_project,
- "datasetId": destination_dataset,
- "tableId": destination_table,
- },
- "destinationTableProperties": {
- "description": self.description,
- "labels": self.labels,
- },
- "sourceFormat": self.source_format,
- "skipLeadingRows": self.skip_leading_rows,
- "sourceUris": source_uris,
- "writeDisposition": self.write_disposition,
- "ignoreUnknownValues": self.ignore_unknown_values,
- "allowQuotedNewlines": self.allow_quoted_newlines,
- "encoding": self.encoding,
- "allowJaggedRows": self.allow_jagged_rows,
- "fieldDelimiter": self.field_delimiter,
- "maxBadRecords": self.max_bad_records,
- "quote": self.quote_character,
- "schemaUpdateOptions": self.schema_update_options,
- },
- }
- if self.cluster_fields:
- self.configuration["load"].update({"clustering": {"fields":
self.cluster_fields}})
- time_partitioning = _cleanse_time_partitioning(
- self.destination_project_dataset_table, self.time_partitioning
- )
- if time_partitioning:
- self.configuration["load"].update({"timePartitioning":
time_partitioning})
- # fields that should only be set if defined
- set_if_def = {
- "quote": self.quote_character,
- "destinationEncryptionConfiguration":
self.encryption_configuration,
- }
- for k, v in set_if_def.items():
- if v:
- self.configuration["load"][k] = v
- self.configuration = self._check_schema_fields(self.configuration)
+ self.configuration = self._use_existing_table()
+
try:
self.log.info("Executing: %s", self.configuration)
job = self._submit_job(self.hook, job_id)
@@ -480,9 +433,9 @@ class GCSToBigQueryOperator(BaseOperator):
)
else:
job.result(timeout=self.result_timeout,
retry=self.result_retry)
- max_id = self._find_max_value_in_column()
self._handle_job_error(job)
- return max_id
+ if self.max_id_key:
+ return self._find_max_value_in_column()
def execute_complete(self, context: Context, event: dict[str, Any]):
"""
@@ -512,7 +465,6 @@ class GCSToBigQueryOperator(BaseOperator):
f"SELECT MAX({self.max_id_key}) AS max_value "
f"FROM {self.destination_project_dataset_table}"
)
-
self.configuration = {
"query": {
"query": select_command,
@@ -520,8 +472,17 @@ class GCSToBigQueryOperator(BaseOperator):
"schemaUpdateOptions": [],
}
}
- job_id = hook.insert_job(configuration=self.configuration,
project_id=hook.project_id)
- rows = list(hook.get_job(job_id=job_id,
location=self.location).result())
+ try:
+ job_id = hook.insert_job(configuration=self.configuration,
project_id=hook.project_id)
+ rows = list(hook.get_job(job_id=job_id,
location=self.location).result())
+ except BadRequest as e:
+ if "Unrecognized name:" in e.message:
+ raise AirflowException(
+ f"Could not determine MAX value in column
{self.max_id_key} "
+ f"since the default value of 'string_field_n' was set
by BQ"
+ )
+ else:
+ raise AirflowException(e.message)
if rows:
for row in rows:
max_id = row[0] if row[0] else 0
@@ -535,53 +496,231 @@ class GCSToBigQueryOperator(BaseOperator):
else:
raise RuntimeError(f"The {select_command} returned no rows!")
- def _check_schema_fields(self, table_resource):
- """
- Helper method to detect schema fields if they were not specified by
user and autodetect=True.
- If source_objects were passed, method reads the second row in CSV
file. If there is at least one digit
- table_resurce is returned without changes so that BigQuery can
determine schema_fields in the
- next step.
- If there are only characters, the first row with fields is used to
construct schema_fields argument
- with type 'STRING'. Table_resource is updated with new schema_fileds
key and returned back to operator
- :param table_resource: Configuration or table_resource dictionary
- :return: table_resource: Updated table_resource dict with schema_fields
- """
- if not self.autodetect and not self.schema_fields:
- raise RuntimeError(
- "Table schema was not found. Set autodetect=True to "
- "automatically set schema fields from source objects or pass "
- "schema_fields explicitly"
+ def _create_empty_table(self):
+ project_id, dataset_id, table_id = self.hook.split_tablename(
+ table_input=self.destination_project_dataset_table,
+ default_project_id=self.hook.project_id or "",
+ )
+
+ external_config_api_repr = {
+ "autodetect": self.autodetect,
+ "sourceFormat": self.source_format,
+ "sourceUris": self.source_uris,
+ "compression": self.compression.upper(),
+ "ignoreUnknownValues": self.ignore_unknown_values,
+ }
+ # if following fields are not specified in src_fmt_configs,
+ # honor the top-level params for backward-compatibility
+ backward_compatibility_configs = {
+ "skipLeadingRows": self.skip_leading_rows,
+ "fieldDelimiter": self.field_delimiter,
+ "quote": self.quote_character,
+ "allowQuotedNewlines": self.allow_quoted_newlines,
+ "allowJaggedRows": self.allow_jagged_rows,
+ "encoding": self.encoding,
+ }
+ src_fmt_to_param_mapping = {"CSV": "csvOptions", "GOOGLE_SHEETS":
"googleSheetsOptions"}
+ src_fmt_to_configs_mapping = {
+ "csvOptions": [
+ "allowJaggedRows",
+ "allowQuotedNewlines",
+ "fieldDelimiter",
+ "skipLeadingRows",
+ "quote",
+ "encoding",
+ ],
+ "googleSheetsOptions": ["skipLeadingRows"],
+ }
+ if self.source_format in src_fmt_to_param_mapping.keys():
+ valid_configs =
src_fmt_to_configs_mapping[src_fmt_to_param_mapping[self.source_format]]
+ self.src_fmt_configs = self._validate_src_fmt_configs(
+ self.source_format, self.src_fmt_configs, valid_configs,
backward_compatibility_configs
)
- elif not self.schema_fields:
- for source_object in self.source_objects:
- gcs_hook = GCSHook(
- gcp_conn_id=self.gcp_conn_id,
- delegate_to=self.delegate_to,
- impersonation_chain=self.impersonation_chain,
- )
- blob = gcs_hook.download(
- bucket_name=self.schema_object_bucket,
- object_name=source_object,
+
external_config_api_repr[src_fmt_to_param_mapping[self.source_format]] =
self.src_fmt_configs
+
+ external_config =
ExternalConfig.from_api_repr(external_config_api_repr)
+ if self.schema_fields:
+ external_config.schema = [SchemaField.from_api_repr(f) for f in
self.schema_fields]
+ if self.max_bad_records:
+ external_config.max_bad_records = self.max_bad_records
+
+ # build table definition
+ table = Table(
+
table_ref=TableReference.from_string(self.destination_project_dataset_table,
project_id)
+ )
+ table.external_data_configuration = external_config
+ if self.labels:
+ table.labels = self.labels
+
+ if self.description:
+ table.description = self.description
+
+ if self.encryption_configuration:
+ table.encryption_configuration =
EncryptionConfiguration.from_api_repr(
+ self.encryption_configuration
+ )
+ table_obj_api_repr = table.to_api_repr()
+
+ self.log.info("Creating external table: %s",
self.destination_project_dataset_table)
+ self.hook.create_empty_table(
+ table_resource=table_obj_api_repr, project_id=project_id,
location=self.location, exists_ok=True
+ )
+ self.log.info("External table created successfully: %s",
self.destination_project_dataset_table)
+ return table_obj_api_repr
+
+ def _use_existing_table(self):
+ destination_project, destination_dataset, destination_table =
self.hook.split_tablename(
+ table_input=self.destination_project_dataset_table,
+ default_project_id=self.hook.project_id or "",
+ var_name="destination_project_dataset_table",
+ )
+
+ # bigquery also allows you to define how you want a table's schema to
change
+ # as a side effect of a load
+ # for more details:
+ #
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schemaUpdateOptions
+ allowed_schema_update_options = ["ALLOW_FIELD_ADDITION",
"ALLOW_FIELD_RELAXATION"]
+ if not
set(allowed_schema_update_options).issuperset(set(self.schema_update_options)):
+ raise ValueError(
+ f"{self.schema_update_options} contains invalid schema update
options. "
+ f"Please only use one or more of the following options:
{allowed_schema_update_options}"
+ )
+
+ self.configuration = {
+ "load": {
+ "autodetect": self.autodetect,
+ "createDisposition": self.create_disposition,
+ "destinationTable": {
+ "projectId": destination_project,
+ "datasetId": destination_dataset,
+ "tableId": destination_table,
+ },
+ "sourceFormat": self.source_format,
+ "sourceUris": self.source_uris,
+ "writeDisposition": self.write_disposition,
+ "ignoreUnknownValues": self.ignore_unknown_values,
+ },
+ }
+ self.time_partitioning = self._cleanse_time_partitioning(
+ self.destination_project_dataset_table, self.time_partitioning
+ )
+ if self.time_partitioning:
+ self.configuration["load"].update({"timePartitioning":
self.time_partitioning})
+
+ if self.cluster_fields:
+ self.configuration["load"].update({"clustering": {"fields":
self.cluster_fields}})
+
+ if self.schema_fields:
+ self.configuration["load"]["schema"] = {"fields":
self.schema_fields}
+
+ if self.schema_update_options:
+ if self.write_disposition not in ["WRITE_APPEND",
"WRITE_TRUNCATE"]:
+ raise ValueError(
+ "schema_update_options is only "
+ "allowed if write_disposition is "
+ "'WRITE_APPEND' or 'WRITE_TRUNCATE'."
)
- fields, values = [item.split(",") for item in
blob.decode("utf-8").splitlines()][:2]
- import re
+ else:
+ # To provide backward compatibility
+ self.schema_update_options = list(self.schema_update_options
or [])
+ self.log.info("Adding experimental 'schemaUpdateOptions': %s",
self.schema_update_options)
+ self.configuration["load"]["schemaUpdateOptions"] =
self.schema_update_options
+
+ if self.max_bad_records:
+ self.configuration["load"]["maxBadRecords"] = self.max_bad_records
+
+ if self.encryption_configuration:
+ self.configuration["load"]["destinationEncryptionConfiguration"] =
self.encryption_configuration
+
+ if self.labels or self.description:
+ self.configuration["load"].update({"destinationTableProperties":
{}})
+ if self.labels:
+
self.configuration["load"]["destinationTableProperties"]["labels"] = self.labels
+ if self.description:
+
self.configuration["load"]["destinationTableProperties"]["description"] =
self.description
+
+ src_fmt_to_configs_mapping = {
+ "CSV": [
+ "allowJaggedRows",
+ "allowQuotedNewlines",
+ "autodetect",
+ "fieldDelimiter",
+ "skipLeadingRows",
+ "ignoreUnknownValues",
+ "nullMarker",
+ "quote",
+ "encoding",
+ ],
+ "DATASTORE_BACKUP": ["projectionFields"],
+ "NEWLINE_DELIMITED_JSON": ["autodetect", "ignoreUnknownValues"],
+ "PARQUET": ["autodetect", "ignoreUnknownValues"],
+ "AVRO": ["useAvroLogicalTypes"],
+ }
+
+ valid_configs = src_fmt_to_configs_mapping[self.source_format]
+
+ # if following fields are not specified in src_fmt_configs,
+ # honor the top-level params for backward-compatibility
+ backward_compatibility_configs = {
+ "skipLeadingRows": self.skip_leading_rows,
+ "fieldDelimiter": self.field_delimiter,
+ "ignoreUnknownValues": self.ignore_unknown_values,
+ "quote": self.quote_character,
+ "allowQuotedNewlines": self.allow_quoted_newlines,
+ "encoding": self.encoding,
+ }
+
+ self.src_fmt_configs = self._validate_src_fmt_configs(
+ self.source_format, self.src_fmt_configs, valid_configs,
backward_compatibility_configs
+ )
- if any(re.match(r"[\d\-\\.]+$", value) for value in values):
- return table_resource
- else:
- schema_fields = []
- for field in fields:
- schema_fields.append({"name": field, "type": "STRING",
"mode": "NULLABLE"})
- self.schema_fields = schema_fields
- if self.external_table:
-
table_resource["externalDataConfiguration"]["csvOptions"]["skipLeadingRows"] = 1
- elif not self.external_table:
- table_resource["load"]["skipLeadingRows"] = 1
- if self.external_table:
- table_resource["schema"] = {"fields": self.schema_fields}
- elif not self.external_table:
- table_resource["load"]["schema"] = {"fields": self.schema_fields}
- return table_resource
+ self.configuration["load"].update(self.src_fmt_configs)
+
+ if self.allow_jagged_rows:
+ self.configuration["load"]["allowJaggedRows"] =
self.allow_jagged_rows
+ return self.configuration
+
+ def _validate_src_fmt_configs(
+ self,
+ source_format: str,
+ src_fmt_configs: dict,
+ valid_configs: list[str],
+ backward_compatibility_configs: dict | None = None,
+ ) -> dict:
+ """
+ Validates the given src_fmt_configs against a valid configuration for
the source format.
+ Adds the backward compatibility config to the src_fmt_configs.
+
+ :param source_format: File format to export.
+ :param src_fmt_configs: Configure optional fields specific to the
source format.
+ :param valid_configs: Valid configuration specific to the source format
+ :param backward_compatibility_configs: The top-level params for
backward-compatibility
+ """
+ if backward_compatibility_configs is None:
+ backward_compatibility_configs = {}
+
+ for k, v in backward_compatibility_configs.items():
+ if k not in src_fmt_configs and k in valid_configs:
+ src_fmt_configs[k] = v
+
+ for k, v in src_fmt_configs.items():
+ if k not in valid_configs:
+ raise ValueError(f"{k} is not a valid src_fmt_configs for type
{source_format}.")
+
+ return src_fmt_configs
+
+ def _cleanse_time_partitioning(
+ self, destination_dataset_table: str | None, time_partitioning_in:
dict | None
+ ) -> dict: # if it is a partitioned table ($ is in the table name) add
partition load option
+
+ if time_partitioning_in is None:
+ time_partitioning_in = {}
+
+ time_partitioning_out = {}
+ if destination_dataset_table and "$" in destination_dataset_table:
+ time_partitioning_out["type"] = "DAY"
+ time_partitioning_out.update(time_partitioning_in)
+ return time_partitioning_out
def on_kill(self) -> None:
if self.job_id and self.cancel_on_kill:
diff --git a/airflow/providers/google/cloud/triggers/bigquery.py
b/airflow/providers/google/cloud/triggers/bigquery.py
index 271b02e3fc..9dd780d24a 100644
--- a/airflow/providers/google/cloud/triggers/bigquery.py
+++ b/airflow/providers/google/cloud/triggers/bigquery.py
@@ -88,12 +88,14 @@ class BigQueryInsertJobTrigger(BaseTrigger):
"message": "Job completed",
}
)
+ return
elif response_from_hook == "pending":
self.log.info("Query is still running...")
self.log.info("Sleeping for %s seconds.",
self.poll_interval)
await asyncio.sleep(self.poll_interval)
else:
yield TriggerEvent({"status": "error", "message":
response_from_hook})
+ return
except Exception as e:
self.log.exception("Exception occurred while checking for
query completion")
diff --git a/tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py
b/tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py
index 1a9356134c..5340448083 100644
--- a/tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py
+++ b/tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py
@@ -54,8 +54,9 @@ SCHEMA_FIELDS_INT = [
]
SCHEMA_BUCKET = "test-schema-bucket"
SCHEMA_OBJECT = "test/schema/schema.json"
-TEST_SOURCE_OBJECTS = ["test/objects/test.csv"]
-TEST_SOURCE_OBJECTS_AS_STRING = "test/objects/test.csv"
+TEST_SOURCE_OBJECTS_LIST = ["test/objects/test.csv"]
+TEST_SOURCE_OBJECTS = "test/objects/test.csv"
+TEST_SOURCE_OBJECTS_JSON = "test/objects/test.json"
LABELS = {"k1": "v1"}
DESCRIPTION = "Test Description"
@@ -63,9 +64,11 @@ job_id = "123456"
hash_ = "hash"
pytest.real_job_id = f"{job_id}_{hash_}"
+GCS_TO_BQ_PATH = "airflow.providers.google.cloud.transfers.gcs_to_bigquery.{}"
+
class TestGCSToBigQueryOperator(unittest.TestCase):
-
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
+ @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
def test_max_value_external_table_should_execute_successfully(self, hook):
hook.return_value.insert_job.side_effect = [
MagicMock(job_id=pytest.real_job_id, error_result=False),
@@ -89,28 +92,29 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
assert result == "1"
hook.return_value.create_empty_table.assert_called_once_with(
+ exists_ok=True,
+ location=None,
+ project_id=PROJECT_ID,
table_resource={
"tableReference": {"projectId": PROJECT_ID, "datasetId":
DATASET, "tableId": TABLE},
- "labels": None,
- "description": None,
+ "labels": {},
"externalDataConfiguration": {
- "source_uris":
[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"],
- "source_format": "CSV",
- "maxBadRecords": 0,
"autodetect": True,
+ "sourceFormat": "CSV",
+ "sourceUris":
[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"],
"compression": "NONE",
+ "ignoreUnknownValues": False,
"csvOptions": {
- "fieldDelimeter": ",",
"skipLeadingRows": None,
+ "fieldDelimiter": ",",
"quote": None,
"allowQuotedNewlines": False,
"allowJaggedRows": False,
+ "encoding": "UTF-8",
},
+ "schema": {"fields": SCHEMA_FIELDS},
},
- "location": None,
- "encryptionConfiguration": None,
- "schema": {"fields": SCHEMA_FIELDS},
- }
+ },
)
hook.return_value.insert_job.assert_called_once_with(
configuration={
@@ -123,7 +127,7 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
project_id=hook.return_value.project_id,
)
-
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
+ @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
def
test_max_value_without_external_table_should_execute_successfully(self, hook):
hook.return_value.insert_job.side_effect = [
MagicMock(job_id=pytest.real_job_id, error_result=False),
@@ -150,35 +154,28 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
calls = [
call(
configuration={
- "load": dict(
- autodetect=True,
- createDisposition="CREATE_IF_NEEDED",
- destinationTable={"projectId": PROJECT_ID,
"datasetId": DATASET, "tableId": TABLE},
- destinationTableProperties={
- "description": None,
- "labels": None,
- },
- sourceFormat="CSV",
- skipLeadingRows=None,
-
sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"],
- writeDisposition=WRITE_DISPOSITION,
- ignoreUnknownValues=False,
- allowQuotedNewlines=False,
- encoding="UTF-8",
- schema={"fields": SCHEMA_FIELDS},
- allowJaggedRows=False,
- fieldDelimiter=",",
- maxBadRecords=0,
- quote=None,
- schemaUpdateOptions=(),
- ),
+ "load": {
+ "autodetect": True,
+ "createDisposition": "CREATE_IF_NEEDED",
+ "destinationTable": {"projectId": PROJECT_ID,
"datasetId": DATASET, "tableId": TABLE},
+ "sourceFormat": "CSV",
+ "sourceUris":
[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"],
+ "writeDisposition": WRITE_DISPOSITION,
+ "ignoreUnknownValues": False,
+ "schema": {"fields": SCHEMA_FIELDS},
+ "skipLeadingRows": None,
+ "fieldDelimiter": ",",
+ "quote": None,
+ "allowQuotedNewlines": False,
+ "encoding": "UTF-8",
+ }
},
- project_id=hook.return_value.project_id,
- location=None,
job_id=pytest.real_job_id,
- timeout=None,
- retry=DEFAULT_RETRY,
+ location=None,
nowait=True,
+ project_id=hook.return_value.project_id,
+ retry=DEFAULT_RETRY,
+ timeout=None,
),
call(
configuration={
@@ -194,7 +191,7 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
hook.return_value.insert_job.assert_has_calls(calls)
-
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
+ @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
def test_max_value_should_throw_ex_when_query_returns_no_rows(self, hook):
hook.return_value.insert_job.side_effect = [
MagicMock(job_id=pytest.real_job_id, error_result=False),
@@ -218,35 +215,28 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
calls = [
call(
configuration={
- "load": dict(
- autodetect=True,
- createDisposition="CREATE_IF_NEEDED",
- destinationTable={"projectId": PROJECT_ID,
"datasetId": DATASET, "tableId": TABLE},
- destinationTableProperties={
- "description": None,
- "labels": None,
- },
- sourceFormat="CSV",
- skipLeadingRows=None,
-
sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"],
- writeDisposition=WRITE_DISPOSITION,
- ignoreUnknownValues=False,
- allowQuotedNewlines=False,
- encoding="UTF-8",
- schema={"fields": SCHEMA_FIELDS},
- allowJaggedRows=False,
- fieldDelimiter=",",
- maxBadRecords=0,
- quote=None,
- schemaUpdateOptions=(),
- ),
+ "load": {
+ "autodetect": True,
+ "createDisposition": "CREATE_IF_NEEDED",
+ "destinationTable": {"projectId": PROJECT_ID,
"datasetId": DATASET, "tableId": TABLE},
+ "sourceFormat": "CSV",
+ "sourceUris":
[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"],
+ "writeDisposition": "WRITE_TRUNCATE",
+ "ignoreUnknownValues": False,
+ "schema": {"fields": SCHEMA_FIELDS},
+ "skipLeadingRows": None,
+ "fieldDelimiter": ",",
+ "quote": None,
+ "allowQuotedNewlines": False,
+ "encoding": "UTF-8",
+ }
},
- project_id=hook.return_value.project_id,
- location=None,
job_id=pytest.real_job_id,
- timeout=None,
- retry=DEFAULT_RETRY,
+ location=None,
nowait=True,
+ project_id=hook.return_value.project_id,
+ retry=DEFAULT_RETRY,
+ timeout=None,
),
call(
configuration={
@@ -262,7 +252,7 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
hook.return_value.insert_job.assert_has_calls(calls)
-
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
+ @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
def test_labels_external_table_should_execute_successfully(self, hook):
hook.return_value.insert_job.side_effect = [
MagicMock(job_id=pytest.real_job_id, error_result=False),
@@ -284,31 +274,32 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
operator.execute(context=MagicMock())
hook.return_value.create_empty_table.assert_called_once_with(
+ exists_ok=True,
+ location=None,
+ project_id=PROJECT_ID,
table_resource={
"tableReference": {"projectId": PROJECT_ID, "datasetId":
DATASET, "tableId": TABLE},
"labels": LABELS,
- "description": None,
"externalDataConfiguration": {
- "source_uris":
[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"],
- "source_format": "CSV",
- "maxBadRecords": 0,
"autodetect": True,
+ "sourceFormat": "CSV",
+ "sourceUris":
[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"],
"compression": "NONE",
+ "ignoreUnknownValues": False,
"csvOptions": {
- "fieldDelimeter": ",",
"skipLeadingRows": None,
+ "fieldDelimiter": ",",
"quote": None,
"allowQuotedNewlines": False,
"allowJaggedRows": False,
+ "encoding": "UTF-8",
},
+ "schema": {"fields": SCHEMA_FIELDS},
},
- "location": None,
- "encryptionConfiguration": None,
- "schema": {"fields": SCHEMA_FIELDS},
- }
+ },
)
-
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
+ @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
def test_labels_without_external_table_should_execute_successfully(self,
hook):
hook.return_value.insert_job.side_effect = [
MagicMock(job_id=pytest.real_job_id, error_result=False),
@@ -332,41 +323,35 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
calls = [
call(
configuration={
- "load": dict(
- autodetect=True,
- createDisposition="CREATE_IF_NEEDED",
- destinationTable={"projectId": PROJECT_ID,
"datasetId": DATASET, "tableId": TABLE},
- destinationTableProperties={
- "description": None,
- "labels": LABELS,
- },
- sourceFormat="CSV",
- skipLeadingRows=None,
-
sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"],
- writeDisposition=WRITE_DISPOSITION,
- ignoreUnknownValues=False,
- allowQuotedNewlines=False,
- encoding="UTF-8",
- schema={"fields": SCHEMA_FIELDS},
- allowJaggedRows=False,
- fieldDelimiter=",",
- maxBadRecords=0,
- quote=None,
- schemaUpdateOptions=(),
- ),
+ "load": {
+ "autodetect": True,
+ "createDisposition": "CREATE_IF_NEEDED",
+ "destinationTable": {"projectId": PROJECT_ID,
"datasetId": DATASET, "tableId": TABLE},
+ "sourceFormat": "CSV",
+ "sourceUris":
[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"],
+ "writeDisposition": "WRITE_TRUNCATE",
+ "ignoreUnknownValues": False,
+ "schema": {"fields": SCHEMA_FIELDS},
+ "destinationTableProperties": {"labels": LABELS},
+ "skipLeadingRows": None,
+ "fieldDelimiter": ",",
+ "quote": None,
+ "allowQuotedNewlines": False,
+ "encoding": "UTF-8",
+ }
},
- project_id=hook.return_value.project_id,
- location=None,
job_id=pytest.real_job_id,
- timeout=None,
- retry=DEFAULT_RETRY,
+ location=None,
nowait=True,
- ),
+ project_id=hook.return_value.project_id,
+ retry=DEFAULT_RETRY,
+ timeout=None,
+ )
]
hook.return_value.insert_job.assert_has_calls(calls)
-
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
+ @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
def test_description_external_table_should_execute_successfully(self,
hook):
hook.return_value.insert_job.side_effect = [
MagicMock(job_id=pytest.real_job_id, error_result=False),
@@ -388,31 +373,33 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
operator.execute(context=MagicMock())
hook.return_value.create_empty_table.assert_called_once_with(
+ exists_ok=True,
+ location=None,
+ project_id=PROJECT_ID,
table_resource={
"tableReference": {"projectId": PROJECT_ID, "datasetId":
DATASET, "tableId": TABLE},
- "labels": None,
- "description": DESCRIPTION,
+ "labels": {},
"externalDataConfiguration": {
- "source_uris":
[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"],
- "source_format": "CSV",
- "maxBadRecords": 0,
"autodetect": True,
+ "sourceFormat": "CSV",
+ "sourceUris":
[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"],
"compression": "NONE",
+ "ignoreUnknownValues": False,
"csvOptions": {
- "fieldDelimeter": ",",
"skipLeadingRows": None,
+ "fieldDelimiter": ",",
"quote": None,
"allowQuotedNewlines": False,
"allowJaggedRows": False,
+ "encoding": "UTF-8",
},
+ "schema": {"fields": SCHEMA_FIELDS},
},
- "location": None,
- "encryptionConfiguration": None,
- "schema": {"fields": SCHEMA_FIELDS},
- }
+ "description": DESCRIPTION,
+ },
)
-
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
+ @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
def
test_description_without_external_table_should_execute_successfully(self, hook):
hook.return_value.insert_job.side_effect = [
MagicMock(job_id=pytest.real_job_id, error_result=False),
@@ -442,21 +429,17 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
destinationTable={"projectId": PROJECT_ID,
"datasetId": DATASET, "tableId": TABLE},
destinationTableProperties={
"description": DESCRIPTION,
- "labels": None,
},
sourceFormat="CSV",
skipLeadingRows=None,
-
sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"],
+
sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"],
writeDisposition=WRITE_DISPOSITION,
ignoreUnknownValues=False,
allowQuotedNewlines=False,
encoding="UTF-8",
schema={"fields": SCHEMA_FIELDS},
- allowJaggedRows=False,
- fieldDelimiter=",",
- maxBadRecords=0,
quote=None,
- schemaUpdateOptions=(),
+ fieldDelimiter=",",
),
},
project_id=hook.return_value.project_id,
@@ -467,10 +450,9 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
nowait=True,
),
]
-
hook.return_value.insert_job.assert_has_calls(calls)
-
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
+ @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
def
test_source_objs_as_list_external_table_should_execute_successfully(self, hook):
hook.return_value.insert_job.side_effect = [
MagicMock(job_id=pytest.real_job_id, error_result=False),
@@ -491,33 +473,34 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
operator.execute(context=MagicMock())
hook.return_value.create_empty_table.assert_called_once_with(
+ exists_ok=True,
+ location=None,
+ project_id=PROJECT_ID,
table_resource={
"tableReference": {"projectId": PROJECT_ID, "datasetId":
DATASET, "tableId": TABLE},
- "labels": None,
- "description": None,
+ "labels": {},
"externalDataConfiguration": {
- "source_uris": [
- f"gs://{TEST_BUCKET}/{source_object}" for
source_object in TEST_SOURCE_OBJECTS
- ],
- "source_format": "CSV",
- "maxBadRecords": 0,
"autodetect": True,
+ "sourceFormat": "CSV",
+ "sourceUris": [
+ f"gs://{TEST_BUCKET}/{source_object}" for
source_object in TEST_SOURCE_OBJECTS_LIST
+ ],
"compression": "NONE",
+ "ignoreUnknownValues": False,
"csvOptions": {
- "fieldDelimeter": ",",
"skipLeadingRows": None,
+ "fieldDelimiter": ",",
"quote": None,
"allowQuotedNewlines": False,
"allowJaggedRows": False,
+ "encoding": "UTF-8",
},
+ "schema": {"fields": SCHEMA_FIELDS},
},
- "location": None,
- "encryptionConfiguration": None,
- "schema": {"fields": SCHEMA_FIELDS},
- }
+ },
)
-
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
+ @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
def
test_source_objs_as_list_without_external_table_should_execute_successfully(self,
hook):
hook.return_value.insert_job.side_effect = [
MagicMock(job_id=pytest.real_job_id, error_result=False),
@@ -540,43 +523,38 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
calls = [
call(
configuration={
- "load": dict(
- autodetect=True,
- createDisposition="CREATE_IF_NEEDED",
- destinationTable={"projectId": PROJECT_ID,
"datasetId": DATASET, "tableId": TABLE},
- destinationTableProperties={
- "description": None,
- "labels": None,
+ "load": {
+ "autodetect": True,
+ "createDisposition": "CREATE_IF_NEEDED",
+ "destinationTable": {
+ "projectId": "test-project",
+ "datasetId": "dataset",
+ "tableId": "table",
},
- sourceFormat="CSV",
- skipLeadingRows=None,
- sourceUris=[
- f"gs://{TEST_BUCKET}/{source_object}" for
source_object in TEST_SOURCE_OBJECTS
- ],
- writeDisposition=WRITE_DISPOSITION,
- ignoreUnknownValues=False,
- allowQuotedNewlines=False,
- encoding="UTF-8",
- schema={"fields": SCHEMA_FIELDS},
- allowJaggedRows=False,
- fieldDelimiter=",",
- maxBadRecords=0,
- quote=None,
- schemaUpdateOptions=(),
- ),
+ "sourceFormat": "CSV",
+ "sourceUris":
["gs://test-bucket/test/objects/test.csv"],
+ "writeDisposition": "WRITE_TRUNCATE",
+ "ignoreUnknownValues": False,
+ "schema": {"fields": SCHEMA_FIELDS},
+ "skipLeadingRows": None,
+ "fieldDelimiter": ",",
+ "quote": None,
+ "allowQuotedNewlines": False,
+ "encoding": "UTF-8",
+ }
},
- project_id=hook.return_value.project_id,
- location=None,
job_id=pytest.real_job_id,
- timeout=None,
- retry=DEFAULT_RETRY,
+ location=None,
nowait=True,
- ),
+ project_id=hook.return_value.project_id,
+ retry=DEFAULT_RETRY,
+ timeout=None,
+ )
]
hook.return_value.insert_job.assert_has_calls(calls)
-
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
+ @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
def
test_source_objs_as_string_external_table_should_execute_successfully(self,
hook):
hook.return_value.insert_job.side_effect = [
MagicMock(job_id=pytest.real_job_id, error_result=False),
@@ -597,31 +575,32 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
operator.execute(context=MagicMock())
hook.return_value.create_empty_table.assert_called_once_with(
+ exists_ok=True,
+ location=None,
+ project_id=PROJECT_ID,
table_resource={
"tableReference": {"projectId": PROJECT_ID, "datasetId":
DATASET, "tableId": TABLE},
- "labels": None,
- "description": None,
+ "labels": {},
"externalDataConfiguration": {
- "source_uris":
[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"],
- "source_format": "CSV",
- "maxBadRecords": 0,
"autodetect": True,
+ "sourceFormat": "CSV",
+ "sourceUris":
[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"],
"compression": "NONE",
+ "ignoreUnknownValues": False,
"csvOptions": {
- "fieldDelimeter": ",",
"skipLeadingRows": None,
+ "fieldDelimiter": ",",
"quote": None,
"allowQuotedNewlines": False,
"allowJaggedRows": False,
+ "encoding": "UTF-8",
},
+ "schema": {"fields": SCHEMA_FIELDS},
},
- "location": None,
- "encryptionConfiguration": None,
- "schema": {"fields": SCHEMA_FIELDS},
- }
+ },
)
-
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
+ @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
def
test_source_objs_as_string_without_external_table_should_execute_successfully(self,
hook):
hook.return_value.insert_job.side_effect = [
MagicMock(job_id=pytest.real_job_id, error_result=False),
@@ -644,42 +623,39 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
calls = [
call(
configuration={
- "load": dict(
- autodetect=True,
- createDisposition="CREATE_IF_NEEDED",
- destinationTable={"projectId": PROJECT_ID,
"datasetId": DATASET, "tableId": TABLE},
- destinationTableProperties={
- "description": None,
- "labels": None,
+ "load": {
+ "autodetect": True,
+ "createDisposition": "CREATE_IF_NEEDED",
+ "destinationTable": {
+ "projectId": PROJECT_ID,
+ "datasetId": DATASET,
+ "tableId": "table",
},
- sourceFormat="CSV",
- skipLeadingRows=None,
-
sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"],
- writeDisposition=WRITE_DISPOSITION,
- ignoreUnknownValues=False,
- allowQuotedNewlines=False,
- encoding="UTF-8",
- schema={"fields": SCHEMA_FIELDS},
- allowJaggedRows=False,
- fieldDelimiter=",",
- maxBadRecords=0,
- quote=None,
- schemaUpdateOptions=(),
- ),
+ "sourceFormat": "CSV",
+ "sourceUris":
[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"],
+ "writeDisposition": "WRITE_TRUNCATE",
+ "ignoreUnknownValues": False,
+ "schema": {"fields": SCHEMA_FIELDS},
+ "skipLeadingRows": None,
+ "fieldDelimiter": ",",
+ "quote": None,
+ "allowQuotedNewlines": False,
+ "encoding": "UTF-8",
+ }
},
- project_id=hook.return_value.project_id,
- location=None,
job_id=pytest.real_job_id,
- timeout=None,
- retry=DEFAULT_RETRY,
+ location=None,
nowait=True,
- ),
+ project_id=hook.return_value.project_id,
+ retry=DEFAULT_RETRY,
+ timeout=None,
+ )
]
hook.return_value.insert_job.assert_has_calls(calls)
-
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSHook")
-
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
+ @mock.patch(GCS_TO_BQ_PATH.format("GCSHook"))
+ @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
def test_schema_obj_external_table_should_execute_successfully(self,
bq_hook, gcs_hook):
bq_hook.return_value.insert_job.side_effect = [
MagicMock(job_id=pytest.real_job_id, error_result=False),
@@ -702,33 +678,34 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
operator.execute(context=MagicMock())
bq_hook.return_value.create_empty_table.assert_called_once_with(
+ exists_ok=True,
+ location=None,
+ project_id=PROJECT_ID,
table_resource={
"tableReference": {"projectId": PROJECT_ID, "datasetId":
DATASET, "tableId": TABLE},
- "labels": None,
- "description": None,
+ "labels": {},
"externalDataConfiguration": {
- "source_uris":
[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"],
- "source_format": "CSV",
- "maxBadRecords": 0,
"autodetect": True,
+ "sourceFormat": "CSV",
+ "sourceUris":
[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"],
"compression": "NONE",
+ "ignoreUnknownValues": False,
"csvOptions": {
- "fieldDelimeter": ",",
"skipLeadingRows": None,
+ "fieldDelimiter": ",",
"quote": None,
"allowQuotedNewlines": False,
"allowJaggedRows": False,
+ "encoding": "UTF-8",
},
+ "schema": {"fields": SCHEMA_FIELDS},
},
- "location": None,
- "encryptionConfiguration": None,
- "schema": {"fields": SCHEMA_FIELDS},
- }
+ },
)
gcs_hook.return_value.download.assert_called_once_with(SCHEMA_BUCKET,
SCHEMA_OBJECT)
-
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSHook")
-
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
+ @mock.patch(GCS_TO_BQ_PATH.format("GCSHook"))
+ @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
def
test_schema_obj_without_external_table_should_execute_successfully(self,
bq_hook, gcs_hook):
bq_hook.return_value.insert_job.side_effect = [
MagicMock(job_id=pytest.real_job_id, error_result=False),
@@ -754,28 +731,21 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
calls = [
call(
configuration={
- "load": dict(
- autodetect=True,
- createDisposition="CREATE_IF_NEEDED",
- destinationTable={"projectId": PROJECT_ID,
"datasetId": DATASET, "tableId": TABLE},
- destinationTableProperties={
- "description": None,
- "labels": None,
- },
- sourceFormat="CSV",
- skipLeadingRows=None,
-
sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"],
- writeDisposition=WRITE_DISPOSITION,
- ignoreUnknownValues=False,
- allowQuotedNewlines=False,
- encoding="UTF-8",
- schema={"fields": SCHEMA_FIELDS},
- allowJaggedRows=False,
- fieldDelimiter=",",
- maxBadRecords=0,
- quote=None,
- schemaUpdateOptions=(),
- ),
+ "load": {
+ "autodetect": True,
+ "createDisposition": "CREATE_IF_NEEDED",
+ "destinationTable": {"projectId": PROJECT_ID,
"datasetId": DATASET, "tableId": TABLE},
+ "sourceFormat": "CSV",
+ "sourceUris":
[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"],
+ "writeDisposition": "WRITE_TRUNCATE",
+ "ignoreUnknownValues": False,
+ "schema": {"fields": SCHEMA_FIELDS},
+ "skipLeadingRows": None,
+ "fieldDelimiter": ",",
+ "quote": None,
+ "allowQuotedNewlines": False,
+ "encoding": "UTF-8",
+ }
},
project_id=bq_hook.return_value.project_id,
location=None,
@@ -783,266 +753,113 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
timeout=None,
retry=DEFAULT_RETRY,
nowait=True,
- ),
+ )
]
bq_hook.return_value.insert_job.assert_has_calls(calls)
gcs_hook.return_value.download.assert_called_once_with(SCHEMA_BUCKET,
SCHEMA_OBJECT)
-
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
- def test_all_fields_should_be_present(self, hook):
+ @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+ def test_execute_should_throw_ex_when_no_bucket_specified(self, hook):
hook.return_value.insert_job.side_effect = [
MagicMock(job_id=pytest.real_job_id, error_result=False),
pytest.real_job_id,
]
hook.return_value.generate_job_id.return_value = pytest.real_job_id
hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET,
TABLE)
- operator = GCSToBigQueryOperator(
- task_id=TASK_ID,
- bucket=TEST_BUCKET,
- source_objects=TEST_SOURCE_OBJECTS,
- schema_fields=SCHEMA_FIELDS,
- destination_project_dataset_table=TEST_EXPLICIT_DEST,
- write_disposition=WRITE_DISPOSITION,
- external_table=False,
- field_delimiter=";",
- max_bad_records=13,
- quote_character="|",
- schema_update_options={"foo": "bar"},
- allow_jagged_rows=True,
- encryption_configuration={"bar": "baz"},
- cluster_fields=["field_1", "field_2"],
- )
-
- operator.execute(context=MagicMock())
+ with pytest.raises(AirflowException, match=r"missing keyword argument
'bucket'"):
+ operator = GCSToBigQueryOperator(
+ task_id=TASK_ID,
+ source_objects=TEST_SOURCE_OBJECTS,
+ destination_project_dataset_table=TEST_EXPLICIT_DEST,
+ schema_fields=SCHEMA_FIELDS,
+ max_id_key=MAX_ID_KEY,
+ write_disposition=WRITE_DISPOSITION,
+ external_table=False,
+ )
+ operator.execute(context=MagicMock())
- calls = [
- call(
- configuration={
- "load": dict(
- autodetect=True,
- createDisposition="CREATE_IF_NEEDED",
- destinationTable={"projectId": PROJECT_ID,
"datasetId": DATASET, "tableId": TABLE},
- destinationTableProperties={
- "description": None,
- "labels": None,
- },
- sourceFormat="CSV",
- skipLeadingRows=None,
-
sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"],
- writeDisposition=WRITE_DISPOSITION,
- ignoreUnknownValues=False,
- allowQuotedNewlines=False,
- encoding="UTF-8",
- schema={"fields": SCHEMA_FIELDS},
- allowJaggedRows=True,
- fieldDelimiter=";",
- maxBadRecords=13,
- quote="|",
- schemaUpdateOptions={"foo": "bar"},
- destinationEncryptionConfiguration={"bar": "baz"},
- clustering={"fields": ["field_1", "field_2"]},
- ),
- },
- project_id=hook.return_value.project_id,
- location=None,
- job_id=pytest.real_job_id,
- timeout=None,
- retry=DEFAULT_RETRY,
- nowait=True,
- ),
+ @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+ def test_execute_should_throw_ex_when_no_source_objects_specified(self,
hook):
+ hook.return_value.insert_job.side_effect = [
+ MagicMock(job_id=pytest.real_job_id, error_result=False),
+ pytest.real_job_id,
]
+ hook.return_value.generate_job_id.return_value = pytest.real_job_id
+ hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET,
TABLE)
+ with pytest.raises(AirflowException, match=r"missing keyword argument
'source_objects'"):
+ operator = GCSToBigQueryOperator(
+ task_id=TASK_ID,
+ destination_project_dataset_table=TEST_EXPLICIT_DEST,
+ schema_fields=SCHEMA_FIELDS,
+ bucket=TEST_BUCKET,
+ max_id_key=MAX_ID_KEY,
+ write_disposition=WRITE_DISPOSITION,
+ external_table=False,
+ )
+ operator.execute(context=MagicMock())
- hook.return_value.insert_job.assert_has_calls(calls)
-
-
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
- def test_date_partitioned_explicit_setting_should_be_found(self, hook):
+ @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+ def
test_execute_should_throw_ex_when_no_destination_project_dataset_table_specified(self,
hook):
hook.return_value.insert_job.side_effect = [
MagicMock(job_id=pytest.real_job_id, error_result=False),
pytest.real_job_id,
]
hook.return_value.generate_job_id.return_value = pytest.real_job_id
hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET,
TABLE)
- operator = GCSToBigQueryOperator(
- task_id=TASK_ID,
- bucket=TEST_BUCKET,
- source_objects=TEST_SOURCE_OBJECTS,
- schema_fields=SCHEMA_FIELDS,
- destination_project_dataset_table=TEST_EXPLICIT_DEST,
- write_disposition=WRITE_DISPOSITION,
- external_table=False,
- time_partitioning={"type": "DAY"},
- )
-
- operator.execute(context=MagicMock())
-
- calls = [
- call(
- configuration={
- "load": dict(
- autodetect=True,
- createDisposition="CREATE_IF_NEEDED",
- destinationTable={"projectId": PROJECT_ID,
"datasetId": DATASET, "tableId": TABLE},
- destinationTableProperties={
- "description": None,
- "labels": None,
- },
- sourceFormat="CSV",
- skipLeadingRows=None,
-
sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"],
- writeDisposition=WRITE_DISPOSITION,
- ignoreUnknownValues=False,
- allowQuotedNewlines=False,
- encoding="UTF-8",
- schema={"fields": SCHEMA_FIELDS},
- allowJaggedRows=False,
- fieldDelimiter=",",
- maxBadRecords=0,
- quote=None,
- schemaUpdateOptions=(),
- timePartitioning={"type": "DAY"},
- ),
- },
- project_id=hook.return_value.project_id,
- location=None,
- job_id=pytest.real_job_id,
- timeout=None,
- retry=DEFAULT_RETRY,
- nowait=True,
- ),
- ]
-
- hook.return_value.insert_job.assert_has_calls(calls)
-
-
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
- def test_date_partitioned_implied_in_table_name_should_be_found(self,
hook):
- hook.return_value.insert_job.side_effect = [
- MagicMock(job_id=pytest.real_job_id, error_result=False),
- pytest.real_job_id,
- ]
- hook.return_value.generate_job_id.return_value = pytest.real_job_id
- hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET,
TABLE)
- operator = GCSToBigQueryOperator(
- task_id=TASK_ID,
- bucket=TEST_BUCKET,
- source_objects=TEST_SOURCE_OBJECTS,
- schema_fields=SCHEMA_FIELDS,
- destination_project_dataset_table=TEST_EXPLICIT_DEST + "$20221123",
- write_disposition=WRITE_DISPOSITION,
- external_table=False,
- )
-
- operator.execute(context=MagicMock())
-
- calls = [
- call(
- configuration={
- "load": dict(
- autodetect=True,
- createDisposition="CREATE_IF_NEEDED",
- destinationTable={"projectId": PROJECT_ID,
"datasetId": DATASET, "tableId": TABLE},
- destinationTableProperties={
- "description": None,
- "labels": None,
- },
- sourceFormat="CSV",
- skipLeadingRows=None,
-
sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"],
- writeDisposition=WRITE_DISPOSITION,
- ignoreUnknownValues=False,
- allowQuotedNewlines=False,
- encoding="UTF-8",
- schema={"fields": SCHEMA_FIELDS},
- allowJaggedRows=False,
- fieldDelimiter=",",
- maxBadRecords=0,
- quote=None,
- schemaUpdateOptions=(),
- timePartitioning={"type": "DAY"},
- ),
- },
- project_id=hook.return_value.project_id,
- location=None,
- job_id=pytest.real_job_id,
- timeout=None,
- retry=DEFAULT_RETRY,
- nowait=True,
- ),
- ]
-
- hook.return_value.insert_job.assert_has_calls(calls)
-
-
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
- def test_execute_should_throw_ex_when_no_bucket_specified(self, hook):
- hook.return_value.insert_job.side_effect = [
- MagicMock(job_id=pytest.real_job_id, error_result=False),
- pytest.real_job_id,
- ]
- hook.return_value.generate_job_id.return_value = pytest.real_job_id
- hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET,
TABLE)
- with pytest.raises(AirflowException, match=r"missing keyword argument
'bucket'"):
- operator = GCSToBigQueryOperator(
- task_id=TASK_ID,
- source_objects=TEST_SOURCE_OBJECTS,
- destination_project_dataset_table=TEST_EXPLICIT_DEST,
- schema_fields=SCHEMA_FIELDS,
- max_id_key=MAX_ID_KEY,
- write_disposition=WRITE_DISPOSITION,
- external_table=False,
- )
- operator.execute(context=MagicMock())
-
-
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
- def test_execute_should_throw_ex_when_no_source_objects_specified(self,
hook):
-
- hook.return_value.insert_job.side_effect = [
- MagicMock(job_id=pytest.real_job_id, error_result=False),
- pytest.real_job_id,
- ]
- hook.return_value.generate_job_id.return_value = pytest.real_job_id
- hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET,
TABLE)
- with pytest.raises(AirflowException, match=r"missing keyword argument
'source_objects'"):
+ with pytest.raises(
+ AirflowException, match=r"missing keyword argument
'destination_project_dataset_table'"
+ ):
operator = GCSToBigQueryOperator(
task_id=TASK_ID,
- destination_project_dataset_table=TEST_EXPLICIT_DEST,
schema_fields=SCHEMA_FIELDS,
bucket=TEST_BUCKET,
+ source_objects=TEST_SOURCE_OBJECTS,
max_id_key=MAX_ID_KEY,
write_disposition=WRITE_DISPOSITION,
external_table=False,
)
operator.execute(context=MagicMock())
-
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
- def
test_execute_should_throw_ex_when_no_destination_project_dataset_table_specified(self,
hook):
+ @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+ def test_source_format_check_should_throw_ex_when_incorrect_source_type(
+ self,
+ hook,
+ ):
hook.return_value.insert_job.side_effect = [
MagicMock(job_id=pytest.real_job_id, error_result=False),
pytest.real_job_id,
]
hook.return_value.generate_job_id.return_value = pytest.real_job_id
hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET,
TABLE)
+ hook.return_value.get_job.return_value.result.return_value = ("1",)
+
with pytest.raises(
- AirflowException, match=r"missing keyword argument
'destination_project_dataset_table'"
+ ValueError,
+ match=r"is not a valid source format.",
):
operator = GCSToBigQueryOperator(
task_id=TASK_ID,
- schema_fields=SCHEMA_FIELDS,
bucket=TEST_BUCKET,
source_objects=TEST_SOURCE_OBJECTS,
+ destination_project_dataset_table=TEST_EXPLICIT_DEST,
max_id_key=MAX_ID_KEY,
write_disposition=WRITE_DISPOSITION,
external_table=False,
+ autodetect=False,
+ source_format="incorrect",
)
operator.execute(context=MagicMock())
-
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSHook")
-
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
- def
test_schema_fields_scanner_external_table_should_execute_successfully(self,
bq_hook, gcs_hook):
+ @mock.patch(GCS_TO_BQ_PATH.format("GCSHook"))
+ @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+ def
test_schema_fields_integer_scanner_external_table_should_execute_successfully(
+ self, bq_hook, gcs_hook
+ ):
"""
Check detection of schema fields if schema_fields parameter is not
- specified and fields are read from source objects correctly by the
operator
- if all fields are characters. In this case operator searches for
fields in source object
- and update configuration with constructed schema_fields.
+ specified and fields are read from source objects correctly by
BigQuery if at least
+ one field includes non-string value.
"""
bq_hook.return_value.insert_job.side_effect = [
MagicMock(job_id=pytest.real_job_id, error_result=False),
@@ -1051,16 +868,15 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
bq_hook.return_value.generate_job_id.return_value = pytest.real_job_id
bq_hook.return_value.split_tablename.return_value = (PROJECT_ID,
DATASET, TABLE)
bq_hook.return_value.get_job.return_value.result.return_value = ("1",)
-
- gcs_hook.return_value.download.return_value = b"id,name\r\none,Anna"
+ gcs_hook.return_value.download.return_value = b"id,name\r\n1,Anna"
operator = GCSToBigQueryOperator(
task_id=TASK_ID,
bucket=TEST_BUCKET,
source_objects=TEST_SOURCE_OBJECTS,
destination_project_dataset_table=TEST_EXPLICIT_DEST,
- max_id_key=MAX_ID_KEY,
write_disposition=WRITE_DISPOSITION,
+ max_id_key=MAX_ID_KEY,
external_table=True,
autodetect=True,
)
@@ -1069,28 +885,28 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
assert result == "1"
bq_hook.return_value.create_empty_table.assert_called_once_with(
+ exists_ok=True,
+ location=None,
+ project_id=PROJECT_ID,
table_resource={
"tableReference": {"projectId": PROJECT_ID, "datasetId":
DATASET, "tableId": TABLE},
- "labels": None,
- "description": None,
+ "labels": {},
"externalDataConfiguration": {
- "source_uris":
[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"],
- "source_format": "CSV",
- "maxBadRecords": 0,
"autodetect": True,
+ "sourceFormat": "CSV",
+ "sourceUris":
[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"],
"compression": "NONE",
+ "ignoreUnknownValues": False,
"csvOptions": {
- "fieldDelimeter": ",",
- "skipLeadingRows": 1,
+ "skipLeadingRows": None,
+ "fieldDelimiter": ",",
"quote": None,
"allowQuotedNewlines": False,
"allowJaggedRows": False,
+ "encoding": "UTF-8",
},
},
- "location": None,
- "encryptionConfiguration": None,
- "schema": {"fields": SCHEMA_FIELDS},
- }
+ },
)
bq_hook.return_value.insert_job.assert_called_once_with(
configuration={
@@ -1103,16 +919,15 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
project_id=bq_hook.return_value.project_id,
)
-
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSHook")
-
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
- def
test_schema_fields_scanner_without_external_table_should_execute_successfully(
+ @mock.patch(GCS_TO_BQ_PATH.format("GCSHook"))
+ @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+ def
test_schema_fields_integer_scanner_without_external_table_should_execute_successfully(
self, bq_hook, gcs_hook
):
"""
Check detection of schema fields if schema_fields parameter is not
- specified and fields are read from source objects correctly by the
operator
- if all fields are characters. In this case operator searches for
fields in source object
- and update configuration with constructed schema_fields.
+ specified and fields are read from source objects correctly by
BigQuery if at least
+ one field includes non-string value.
"""
bq_hook.return_value.insert_job.side_effect = [
MagicMock(job_id=pytest.real_job_id, error_result=False),
@@ -1121,8 +936,7 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
bq_hook.return_value.generate_job_id.return_value = pytest.real_job_id
bq_hook.return_value.split_tablename.return_value = (PROJECT_ID,
DATASET, TABLE)
bq_hook.return_value.get_job.return_value.result.return_value = ("1",)
-
- gcs_hook.return_value.download.return_value = b"id,name\r\none,Anna"
+ gcs_hook.return_value.download.return_value = b"id,name\r\n1,Anna"
operator = GCSToBigQueryOperator(
task_id=TASK_ID,
@@ -1141,35 +955,27 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
calls = [
call(
configuration={
- "load": dict(
- autodetect=True,
- createDisposition="CREATE_IF_NEEDED",
- destinationTable={"projectId": PROJECT_ID,
"datasetId": DATASET, "tableId": TABLE},
- destinationTableProperties={
- "description": None,
- "labels": None,
- },
- sourceFormat="CSV",
- skipLeadingRows=1,
-
sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"],
- writeDisposition=WRITE_DISPOSITION,
- ignoreUnknownValues=False,
- allowQuotedNewlines=False,
- encoding="UTF-8",
- schema={"fields": SCHEMA_FIELDS},
- allowJaggedRows=False,
- fieldDelimiter=",",
- maxBadRecords=0,
- quote=None,
- schemaUpdateOptions=(),
- ),
+ "load": {
+ "autodetect": True,
+ "createDisposition": "CREATE_IF_NEEDED",
+ "destinationTable": {"projectId": PROJECT_ID,
"datasetId": DATASET, "tableId": TABLE},
+ "sourceFormat": "CSV",
+ "sourceUris":
[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"],
+ "writeDisposition": WRITE_DISPOSITION,
+ "ignoreUnknownValues": False,
+ "skipLeadingRows": None,
+ "fieldDelimiter": ",",
+ "quote": None,
+ "allowQuotedNewlines": False,
+ "encoding": "UTF-8",
+ }
},
- project_id=bq_hook.return_value.project_id,
- location=None,
job_id=pytest.real_job_id,
- timeout=None,
- retry=DEFAULT_RETRY,
+ location=None,
nowait=True,
+ project_id=bq_hook.return_value.project_id,
+ retry=DEFAULT_RETRY,
+ timeout=None,
),
call(
configuration={
@@ -1185,11 +991,8 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
bq_hook.return_value.insert_job.assert_has_calls(calls)
-
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
- def
test_schema_fields_scanner_external_table_should_throw_ex_when_autodetect_not_specified(
- self,
- hook,
- ):
+ @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+ def
test_schema_fields_without_external_table_should_execute_successfully(self,
hook):
hook.return_value.insert_job.side_effect = [
MagicMock(job_id=pytest.real_job_id, error_result=False),
pytest.real_job_id,
@@ -1198,24 +1001,50 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET,
TABLE)
hook.return_value.get_job.return_value.result.return_value = ("1",)
- with pytest.raises(RuntimeError, match=r"Table schema was not found."):
- operator = GCSToBigQueryOperator(
- task_id=TASK_ID,
- bucket=TEST_BUCKET,
- source_objects=TEST_SOURCE_OBJECTS,
- destination_project_dataset_table=TEST_EXPLICIT_DEST,
- max_id_key=MAX_ID_KEY,
- write_disposition=WRITE_DISPOSITION,
- external_table=True,
- autodetect=False,
+ operator = GCSToBigQueryOperator(
+ task_id=TASK_ID,
+ bucket=TEST_BUCKET,
+ source_objects=TEST_SOURCE_OBJECTS,
+ destination_project_dataset_table=TEST_EXPLICIT_DEST,
+ write_disposition=WRITE_DISPOSITION,
+ schema_fields=SCHEMA_FIELDS_INT,
+ external_table=False,
+ autodetect=True,
+ )
+
+ operator.execute(context=MagicMock())
+ calls = [
+ call(
+ configuration={
+ "load": {
+ "autodetect": True,
+ "createDisposition": "CREATE_IF_NEEDED",
+ "destinationTable": {"projectId": PROJECT_ID,
"datasetId": DATASET, "tableId": TABLE},
+ "sourceFormat": "CSV",
+ "sourceUris":
[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"],
+ "writeDisposition": WRITE_DISPOSITION,
+ "ignoreUnknownValues": False,
+ "schema": {"fields": SCHEMA_FIELDS_INT},
+ "skipLeadingRows": None,
+ "fieldDelimiter": ",",
+ "quote": None,
+ "allowQuotedNewlines": False,
+ "encoding": "UTF-8",
+ }
+ },
+ job_id=pytest.real_job_id,
+ location=None,
+ nowait=True,
+ project_id=hook.return_value.project_id,
+ retry=DEFAULT_RETRY,
+ timeout=None,
)
- operator.execute(context=MagicMock())
+ ]
-
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
- def
test_schema_fields_scanner_without_external_table_should_throw_ex_when_autodetect_not_specified(
- self,
- hook,
- ):
+ hook.return_value.insert_job.assert_has_calls(calls)
+
+ @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+ def test_schema_fields_external_table_should_execute_successfully(self,
hook):
hook.return_value.insert_job.side_effect = [
MagicMock(job_id=pytest.real_job_id, error_result=False),
pytest.real_job_id,
@@ -1224,104 +1053,56 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET,
TABLE)
hook.return_value.get_job.return_value.result.return_value = ("1",)
- with pytest.raises(RuntimeError, match=r"Table schema was not found."):
- operator = GCSToBigQueryOperator(
- task_id=TASK_ID,
- bucket=TEST_BUCKET,
- source_objects=TEST_SOURCE_OBJECTS,
- destination_project_dataset_table=TEST_EXPLICIT_DEST,
- max_id_key=MAX_ID_KEY,
- write_disposition=WRITE_DISPOSITION,
- external_table=False,
- autodetect=False,
- )
- operator.execute(context=MagicMock())
-
-
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSHook")
-
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
- def
test_schema_fields_integer_scanner_external_table_should_execute_successfully(
- self, bq_hook, gcs_hook
- ):
- """
- Check detection of schema fields if schema_fields parameter is not
- specified and fields are read from source objects correctly by
BigQuery if at least
- one field includes non-string value.
- """
- bq_hook.return_value.insert_job.side_effect = [
- MagicMock(job_id=pytest.real_job_id, error_result=False),
- pytest.real_job_id,
- ]
- bq_hook.return_value.generate_job_id.return_value = pytest.real_job_id
- bq_hook.return_value.split_tablename.return_value = (PROJECT_ID,
DATASET, TABLE)
- bq_hook.return_value.get_job.return_value.result.return_value = ("1",)
- gcs_hook.return_value.download.return_value = b"id,name\r\n1,Anna"
-
operator = GCSToBigQueryOperator(
task_id=TASK_ID,
bucket=TEST_BUCKET,
source_objects=TEST_SOURCE_OBJECTS,
destination_project_dataset_table=TEST_EXPLICIT_DEST,
write_disposition=WRITE_DISPOSITION,
- max_id_key=MAX_ID_KEY,
+ schema_fields=SCHEMA_FIELDS_INT,
external_table=True,
autodetect=True,
)
- result = operator.execute(context=MagicMock())
-
- assert result == "1"
- bq_hook.return_value.create_empty_table.assert_called_once_with(
+ operator.execute(context=MagicMock())
+ hook.return_value.create_empty_table.assert_called_once_with(
+ exists_ok=True,
+ location=None,
+ project_id=PROJECT_ID,
table_resource={
"tableReference": {"projectId": PROJECT_ID, "datasetId":
DATASET, "tableId": TABLE},
- "labels": None,
- "description": None,
+ "labels": {},
"externalDataConfiguration": {
- "source_uris":
[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"],
- "source_format": "CSV",
- "maxBadRecords": 0,
"autodetect": True,
+ "sourceFormat": "CSV",
+ "sourceUris":
[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"],
"compression": "NONE",
+ "ignoreUnknownValues": False,
"csvOptions": {
- "fieldDelimeter": ",",
"skipLeadingRows": None,
+ "fieldDelimiter": ",",
"quote": None,
"allowQuotedNewlines": False,
"allowJaggedRows": False,
+ "encoding": "UTF-8",
},
+ "schema": {"fields": SCHEMA_FIELDS_INT},
},
- "location": None,
- "encryptionConfiguration": None,
- }
- )
- bq_hook.return_value.insert_job.assert_called_once_with(
- configuration={
- "query": {
- "query": f"SELECT MAX({MAX_ID_KEY}) AS max_value FROM
{TEST_EXPLICIT_DEST}",
- "useLegacySql": False,
- "schemaUpdateOptions": [],
- }
- },
- project_id=bq_hook.return_value.project_id,
+ },
)
-
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSHook")
-
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
- def
test_schema_fields_integer_scanner_without_external_table_should_execute_successfully(
- self, bq_hook, gcs_hook
- ):
+
+class TestAsyncGCSToBigQueryOperator(unittest.TestCase):
+ @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+ def
test_execute_without_external_table_async_should_execute_successfully(self,
hook):
"""
- Check detection of schema fields if schema_fields parameter is not
- specified and fields are read from source objects correctly by
BigQuery if at least
- one field includes non-string value.
+ Asserts that a task is deferred and a BigQueryInsertJobTrigger will be
fired
+ when Operator is executed in deferrable.
"""
- bq_hook.return_value.insert_job.side_effect = [
- MagicMock(job_id=pytest.real_job_id, error_result=False),
- pytest.real_job_id,
- ]
- bq_hook.return_value.generate_job_id.return_value = pytest.real_job_id
- bq_hook.return_value.split_tablename.return_value = (PROJECT_ID,
DATASET, TABLE)
- bq_hook.return_value.get_job.return_value.result.return_value = ("1",)
- gcs_hook.return_value.download.return_value = b"id,name\r\n1,Anna"
+ hook.return_value.insert_job.return_value =
MagicMock(job_id=pytest.real_job_id, error_result=False)
+ hook.return_value.generate_job_id.return_value = pytest.real_job_id
+ hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET,
TABLE)
+ hook.return_value.get_job.return_value.result.return_value = ("1",)
operator = GCSToBigQueryOperator(
task_id=TASK_ID,
@@ -1329,69 +1110,76 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
source_objects=TEST_SOURCE_OBJECTS,
destination_project_dataset_table=TEST_EXPLICIT_DEST,
write_disposition=WRITE_DISPOSITION,
- max_id_key=MAX_ID_KEY,
+ schema_fields=SCHEMA_FIELDS,
external_table=False,
autodetect=True,
+ deferrable=True,
)
- result = operator.execute(context=MagicMock())
+ with pytest.raises(TaskDeferred) as exc:
+ operator.execute(self.create_context(operator))
- assert result == "1"
- calls = [
- call(
- configuration={
- "load": dict(
- autodetect=True,
- createDisposition="CREATE_IF_NEEDED",
- destinationTable={"projectId": PROJECT_ID,
"datasetId": DATASET, "tableId": TABLE},
- destinationTableProperties={
- "description": None,
- "labels": None,
- },
- sourceFormat="CSV",
- skipLeadingRows=None,
-
sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"],
- writeDisposition=WRITE_DISPOSITION,
- ignoreUnknownValues=False,
- allowQuotedNewlines=False,
- encoding="UTF-8",
- allowJaggedRows=False,
- fieldDelimiter=",",
- maxBadRecords=0,
- quote=None,
- schemaUpdateOptions=(),
- ),
- },
- project_id=bq_hook.return_value.project_id,
- location=None,
- job_id=pytest.real_job_id,
- timeout=None,
- retry=DEFAULT_RETRY,
- nowait=True,
- ),
- call(
- configuration={
- "query": {
- "query": f"SELECT MAX({MAX_ID_KEY}) AS max_value FROM
{TEST_EXPLICIT_DEST}",
- "useLegacySql": False,
- "schemaUpdateOptions": [],
- }
- },
- project_id=bq_hook.return_value.project_id,
- ),
- ]
+ assert isinstance(
+ exc.value.trigger, BigQueryInsertJobTrigger
+ ), "Trigger is not a BigQueryInsertJobTrigger"
- bq_hook.return_value.insert_job.assert_has_calls(calls)
+ def
test_execute_without_external_table_async_should_throw_ex_when_event_status_error(self):
+ """
+ Tests that an AirflowException is raised in case of error event.
+ """
-
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
- def
test_schema_fields_without_external_table_should_execute_successfully(self,
hook):
- hook.return_value.insert_job.side_effect = [
- MagicMock(job_id=pytest.real_job_id, error_result=False),
- pytest.real_job_id,
- ]
- hook.return_value.generate_job_id.return_value = pytest.real_job_id
+ with pytest.raises(AirflowException):
+ operator = GCSToBigQueryOperator(
+ task_id=TASK_ID,
+ bucket=TEST_BUCKET,
+ source_objects=TEST_SOURCE_OBJECTS,
+ destination_project_dataset_table=TEST_EXPLICIT_DEST,
+ write_disposition=WRITE_DISPOSITION,
+ schema_fields=SCHEMA_FIELDS,
+ external_table=False,
+ autodetect=True,
+ deferrable=True,
+ )
+ operator.execute_complete(
+ context=None, event={"status": "error", "message": "test
failure message"}
+ )
+
+ def
test_execute_logging_without_external_table_async_should_execute_successfully(self):
+ """
+ Asserts that logging occurs as expected.
+ """
+
+ operator = GCSToBigQueryOperator(
+ task_id=TASK_ID,
+ bucket=TEST_BUCKET,
+ source_objects=TEST_SOURCE_OBJECTS,
+ destination_project_dataset_table=TEST_EXPLICIT_DEST,
+ write_disposition=WRITE_DISPOSITION,
+ schema_fields=SCHEMA_FIELDS,
+ external_table=False,
+ autodetect=True,
+ deferrable=True,
+ )
+ with mock.patch.object(operator.log, "info") as mock_log_info:
+ operator.execute_complete(
+ context=self.create_context(operator),
+ event={"status": "success", "message": "Job completed",
"job_id": job_id},
+ )
+ mock_log_info.assert_called_with(
+ "%s completed with response %s ", "test-gcs-to-bq-operator", "Job
completed"
+ )
+
+ @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+ def
test_execute_without_external_table_generate_job_id_async_should_execute_successfully(self,
hook):
+ hook.return_value.insert_job.side_effect = Conflict("any")
hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET,
TABLE)
- hook.return_value.get_job.return_value.result.return_value = ("1",)
+ job = MagicMock(
+ job_id=pytest.real_job_id,
+ error_result=False,
+ state="PENDING",
+ done=lambda: False,
+ )
+ hook.return_value.get_job.return_value = job
operator = GCSToBigQueryOperator(
task_id=TASK_ID,
@@ -1399,58 +1187,38 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
source_objects=TEST_SOURCE_OBJECTS,
destination_project_dataset_table=TEST_EXPLICIT_DEST,
write_disposition=WRITE_DISPOSITION,
- schema_fields=SCHEMA_FIELDS_INT,
+ schema_fields=SCHEMA_FIELDS,
+ reattach_states={"PENDING"},
external_table=False,
autodetect=True,
+ deferrable=True,
)
- operator.execute(context=MagicMock())
- calls = [
- call(
- configuration={
- "load": dict(
- autodetect=True,
- createDisposition="CREATE_IF_NEEDED",
- destinationTable={"projectId": PROJECT_ID,
"datasetId": DATASET, "tableId": TABLE},
- destinationTableProperties={
- "description": None,
- "labels": None,
- },
- sourceFormat="CSV",
- skipLeadingRows=None,
-
sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"],
- writeDisposition=WRITE_DISPOSITION,
- ignoreUnknownValues=False,
- allowQuotedNewlines=False,
- encoding="UTF-8",
- schema={"fields": SCHEMA_FIELDS_INT},
- allowJaggedRows=False,
- fieldDelimiter=",",
- maxBadRecords=0,
- quote=None,
- schemaUpdateOptions=(),
- ),
- },
- project_id=hook.return_value.project_id,
- location=None,
- job_id=pytest.real_job_id,
- timeout=None,
- retry=DEFAULT_RETRY,
- nowait=True,
- ),
- ]
+ with pytest.raises(TaskDeferred):
+ operator.execute(self.create_context(operator))
- hook.return_value.insert_job.assert_has_calls(calls)
+ hook.return_value.generate_job_id.assert_called_once_with(
+ job_id=None,
+ dag_id="adhoc_airflow",
+ task_id=TASK_ID,
+ logical_date=datetime(2022, 1, 1, 0, 0),
+ configuration={},
+ force_rerun=True,
+ )
-
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
- def test_schema_fields_external_table_should_execute_successfully(self,
hook):
- hook.return_value.insert_job.side_effect = [
- MagicMock(job_id=pytest.real_job_id, error_result=False),
- pytest.real_job_id,
- ]
+ @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+ def
test_execute_without_external_table_reattach_async_should_execute_successfully(self,
hook):
hook.return_value.generate_job_id.return_value = pytest.real_job_id
+
+ hook.return_value.insert_job.side_effect = Conflict("any")
hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET,
TABLE)
- hook.return_value.get_job.return_value.result.return_value = ("1",)
+ job = MagicMock(
+ job_id=pytest.real_job_id,
+ error_result=False,
+ state="PENDING",
+ done=lambda: False,
+ )
+ hook.return_value.get_job.return_value = job
operator = GCSToBigQueryOperator(
task_id=TASK_ID,
@@ -1458,75 +1226,39 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
source_objects=TEST_SOURCE_OBJECTS,
destination_project_dataset_table=TEST_EXPLICIT_DEST,
write_disposition=WRITE_DISPOSITION,
- schema_fields=SCHEMA_FIELDS_INT,
- external_table=True,
+ schema_fields=SCHEMA_FIELDS,
+ location=TEST_DATASET_LOCATION,
+ reattach_states={"PENDING"},
+ external_table=False,
autodetect=True,
+ deferrable=True,
)
- operator.execute(context=MagicMock())
- hook.return_value.create_empty_table.assert_called_once_with(
- table_resource={
- "tableReference": {"projectId": PROJECT_ID, "datasetId":
DATASET, "tableId": TABLE},
- "labels": None,
- "description": None,
- "externalDataConfiguration": {
- "source_uris":
[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"],
- "source_format": "CSV",
- "maxBadRecords": 0,
- "autodetect": True,
- "compression": "NONE",
- "csvOptions": {
- "fieldDelimeter": ",",
- "skipLeadingRows": None,
- "quote": None,
- "allowQuotedNewlines": False,
- "allowJaggedRows": False,
- },
- },
- "location": None,
- "encryptionConfiguration": None,
- "schema": {"fields": SCHEMA_FIELDS_INT},
- }
+ with pytest.raises(TaskDeferred):
+ operator.execute(self.create_context(operator))
+
+ hook.return_value.get_job.assert_called_once_with(
+ location=TEST_DATASET_LOCATION,
+ job_id=pytest.real_job_id,
+ project_id=hook.return_value.project_id,
)
+ job._begin.assert_called_once_with()
+
+ @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+ def
test_execute_without_external_table_force_rerun_async_should_execute_successfully(self,
hook):
+ hook.return_value.generate_job_id.return_value = f"{job_id}_{hash_}"
+ hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET,
TABLE)
+
+ hook.return_value.insert_job.side_effect = Conflict("any")
+ job = MagicMock(
+ job_id=pytest.real_job_id,
+ error_result=False,
+ state="DONE",
+ done=lambda: False,
+ )
+ hook.return_value.get_job.return_value = job
[email protected]("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
-def
test_execute_without_external_table_async_should_execute_successfully(hook):
- """
- Asserts that a task is deferred and a BigQueryInsertJobTrigger will be
fired
- when Operator is executed in deferrable.
- """
- hook.return_value.insert_job.return_value =
MagicMock(job_id=pytest.real_job_id, error_result=False)
- hook.return_value.generate_job_id.return_value = pytest.real_job_id
- hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET,
TABLE)
- hook.return_value.get_job.return_value.result.return_value = ("1",)
-
- operator = GCSToBigQueryOperator(
- task_id=TASK_ID,
- bucket=TEST_BUCKET,
- source_objects=TEST_SOURCE_OBJECTS,
- destination_project_dataset_table=TEST_EXPLICIT_DEST,
- write_disposition=WRITE_DISPOSITION,
- schema_fields=SCHEMA_FIELDS,
- external_table=False,
- autodetect=True,
- deferrable=True,
- )
-
- with pytest.raises(TaskDeferred) as exc:
- operator.execute(create_context(operator))
-
- assert isinstance(
- exc.value.trigger, BigQueryInsertJobTrigger
- ), "Trigger is not a BigQueryInsertJobTrigger"
-
-
-def
test_execute_without_external_table_async_should_throw_ex_when_event_status_error():
- """
- Tests that an AirflowException is raised in case of error event.
- """
-
- with pytest.raises(AirflowException):
operator = GCSToBigQueryOperator(
task_id=TASK_ID,
bucket=TEST_BUCKET,
@@ -1534,317 +1266,196 @@ def
test_execute_without_external_table_async_should_throw_ex_when_event_status_
destination_project_dataset_table=TEST_EXPLICIT_DEST,
write_disposition=WRITE_DISPOSITION,
schema_fields=SCHEMA_FIELDS,
+ location=TEST_DATASET_LOCATION,
+ reattach_states={"PENDING"},
external_table=False,
autodetect=True,
deferrable=True,
)
- operator.execute_complete(context=None, event={"status": "error",
"message": "test failure message"})
-
-
-def
test_execute_logging_without_external_table_async_should_execute_successfully():
- """
- Asserts that logging occurs as expected.
- """
-
- operator = GCSToBigQueryOperator(
- task_id=TASK_ID,
- bucket=TEST_BUCKET,
- source_objects=TEST_SOURCE_OBJECTS,
- destination_project_dataset_table=TEST_EXPLICIT_DEST,
- write_disposition=WRITE_DISPOSITION,
- schema_fields=SCHEMA_FIELDS,
- external_table=False,
- autodetect=True,
- deferrable=True,
- )
- with mock.patch.object(operator.log, "info") as mock_log_info:
- operator.execute_complete(
- context=create_context(operator),
- event={"status": "success", "message": "Job completed", "job_id":
job_id},
+
+ with pytest.raises(AirflowException) as exc:
+ operator.execute(self.create_context(operator))
+
+ expected_exception_msg = (
+ f"Job with id: {pytest.real_job_id} already exists and is in
{job.state} state. "
+ f"If you want to force rerun it consider setting
`force_rerun=True`."
+ f"Or, if you want to reattach in this scenario add {job.state} to
`reattach_states`"
)
- mock_log_info.assert_called_with(
- "%s completed with response %s ", "test-gcs-to-bq-operator", "Job
completed"
- )
-
-
[email protected]("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
-def
test_execute_without_external_table_generate_job_id_async_should_execute_successfully(hook):
- hook.return_value.insert_job.side_effect = Conflict("any")
- hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET,
TABLE)
- job = MagicMock(
- job_id=pytest.real_job_id,
- error_result=False,
- state="PENDING",
- done=lambda: False,
- )
- hook.return_value.get_job.return_value = job
-
- operator = GCSToBigQueryOperator(
- task_id=TASK_ID,
- bucket=TEST_BUCKET,
- source_objects=TEST_SOURCE_OBJECTS,
- destination_project_dataset_table=TEST_EXPLICIT_DEST,
- write_disposition=WRITE_DISPOSITION,
- schema_fields=SCHEMA_FIELDS,
- reattach_states={"PENDING"},
- external_table=False,
- autodetect=True,
- deferrable=True,
- )
-
- with pytest.raises(TaskDeferred):
- operator.execute(create_context(operator))
-
- hook.return_value.generate_job_id.assert_called_once_with(
- job_id=None,
- dag_id="adhoc_airflow",
- task_id=TASK_ID,
- logical_date=datetime(2022, 1, 1, 0, 0),
- configuration={},
- force_rerun=True,
- )
-
-
[email protected]("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
-def
test_execute_without_external_table_reattach_async_should_execute_successfully(hook):
- hook.return_value.generate_job_id.return_value = pytest.real_job_id
-
- hook.return_value.insert_job.side_effect = Conflict("any")
- hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET,
TABLE)
- job = MagicMock(
- job_id=pytest.real_job_id,
- error_result=False,
- state="PENDING",
- done=lambda: False,
- )
- hook.return_value.get_job.return_value = job
-
- operator = GCSToBigQueryOperator(
- task_id=TASK_ID,
- bucket=TEST_BUCKET,
- source_objects=TEST_SOURCE_OBJECTS,
- destination_project_dataset_table=TEST_EXPLICIT_DEST,
- write_disposition=WRITE_DISPOSITION,
- schema_fields=SCHEMA_FIELDS,
- location=TEST_DATASET_LOCATION,
- reattach_states={"PENDING"},
- external_table=False,
- autodetect=True,
- deferrable=True,
- )
-
- with pytest.raises(TaskDeferred):
- operator.execute(create_context(operator))
-
- hook.return_value.get_job.assert_called_once_with(
- location=TEST_DATASET_LOCATION,
- job_id=pytest.real_job_id,
- project_id=hook.return_value.project_id,
- )
-
- job._begin.assert_called_once_with()
-
-
[email protected]("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
-def
test_execute_without_external_table_force_rerun_async_should_execute_successfully(hook):
- hook.return_value.generate_job_id.return_value = f"{job_id}_{hash_}"
- hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET,
TABLE)
-
- hook.return_value.insert_job.side_effect = Conflict("any")
- job = MagicMock(
- job_id=pytest.real_job_id,
- error_result=False,
- state="DONE",
- done=lambda: False,
- )
- hook.return_value.get_job.return_value = job
-
- operator = GCSToBigQueryOperator(
- task_id=TASK_ID,
- bucket=TEST_BUCKET,
- source_objects=TEST_SOURCE_OBJECTS,
- destination_project_dataset_table=TEST_EXPLICIT_DEST,
- write_disposition=WRITE_DISPOSITION,
- schema_fields=SCHEMA_FIELDS,
- location=TEST_DATASET_LOCATION,
- reattach_states={"PENDING"},
- external_table=False,
- autodetect=True,
- deferrable=True,
- )
-
- with pytest.raises(AirflowException) as exc:
- operator.execute(create_context(operator))
-
- expected_exception_msg = (
- f"Job with id: {pytest.real_job_id} already exists and is in
{job.state} state. "
- f"If you want to force rerun it consider setting `force_rerun=True`."
- f"Or, if you want to reattach in this scenario add {job.state} to
`reattach_states`"
- )
-
- assert str(exc.value) == expected_exception_msg
-
- hook.return_value.get_job.assert_called_once_with(
- location=TEST_DATASET_LOCATION,
- job_id=pytest.real_job_id,
- project_id=hook.return_value.project_id,
- )
-
-
[email protected]("airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSHook")
[email protected]("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
-def
test_schema_fields_without_external_table_async_should_execute_successfully(bq_hook,
gcs_hook):
- bq_hook.return_value.insert_job.return_value =
MagicMock(job_id=pytest.real_job_id, error_result=False)
- bq_hook.return_value.generate_job_id.return_value = pytest.real_job_id
- bq_hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET,
TABLE)
- bq_hook.return_value.get_job.return_value.result.return_value = ("1",)
- gcs_hook.return_value.download.return_value = b"id,name\r\none,Anna"
-
- operator = GCSToBigQueryOperator(
- task_id=TASK_ID,
- bucket=TEST_BUCKET,
- source_objects=TEST_SOURCE_OBJECTS,
- destination_project_dataset_table=TEST_EXPLICIT_DEST,
- write_disposition=WRITE_DISPOSITION,
- schema_fields=SCHEMA_FIELDS,
- max_id_key=MAX_ID_KEY,
- external_table=False,
- autodetect=True,
- deferrable=True,
- )
-
- with pytest.raises(TaskDeferred):
- result = operator.execute(create_context(operator))
- assert result == "1"
- calls = [
- call(
- configuration={
- "load": dict(
- autodetect=True,
- createDisposition="CREATE_IF_NEEDED",
- destinationTable={"projectId": PROJECT_ID,
"datasetId": DATASET, "tableId": TABLE},
- destinationTableProperties={
- "description": None,
- "labels": None,
- },
- sourceFormat="CSV",
- skipLeadingRows=None,
-
sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"],
- writeDisposition=WRITE_DISPOSITION,
- ignoreUnknownValues=False,
- allowQuotedNewlines=False,
- encoding="UTF-8",
- schema={"fields": SCHEMA_FIELDS},
- ),
- },
- project_id=bq_hook.return_value.project_id,
- location=None,
- job_id=pytest.real_job_id,
- timeout=None,
- retry=DEFAULT_RETRY,
- nowait=True,
- ),
- call(
- configuration={
- "query": {
- "query": f"SELECT MAX({MAX_ID_KEY}) AS max_value FROM
{TEST_EXPLICIT_DEST}",
- "useLegacySql": False,
- "schemaUpdateOptions": [],
- }
- },
- project_id=bq_hook.return_value.project_id,
- ),
- ]
+ assert str(exc.value) == expected_exception_msg
- bq_hook.return_value.insert_job.assert_has_calls(calls)
+ hook.return_value.get_job.assert_called_once_with(
+ location=TEST_DATASET_LOCATION,
+ job_id=pytest.real_job_id,
+ project_id=hook.return_value.project_id,
+ )
+
+ @mock.patch(GCS_TO_BQ_PATH.format("GCSHook"))
+ @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+ def
test_schema_fields_without_external_table_async_should_execute_successfully(self,
bq_hook, gcs_hook):
+ bq_hook.return_value.insert_job.return_value = MagicMock(
+ job_id=pytest.real_job_id, error_result=False
+ )
+ bq_hook.return_value.generate_job_id.return_value = pytest.real_job_id
+ bq_hook.return_value.split_tablename.return_value = (PROJECT_ID,
DATASET, TABLE)
+ bq_hook.return_value.get_job.return_value.result.return_value = ("1",)
+ gcs_hook.return_value.download.return_value = b"id,name\r\none,Anna"
+ operator = GCSToBigQueryOperator(
+ task_id=TASK_ID,
+ bucket=TEST_BUCKET,
+ source_objects=TEST_SOURCE_OBJECTS,
+ destination_project_dataset_table=TEST_EXPLICIT_DEST,
+ write_disposition=WRITE_DISPOSITION,
+ schema_fields=SCHEMA_FIELDS,
+ max_id_key=MAX_ID_KEY,
+ external_table=False,
+ autodetect=True,
+ deferrable=True,
+ )
[email protected]("airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSHook")
[email protected]("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
-def
test_schema_fields_int_without_external_table_async_should_execute_successfully(bq_hook,
gcs_hook):
- bq_hook.return_value.insert_job.return_value =
MagicMock(job_id=pytest.real_job_id, error_result=False)
- bq_hook.return_value.generate_job_id.return_value = pytest.real_job_id
- bq_hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET,
TABLE)
- bq_hook.return_value.get_job.return_value.result.return_value = ("1",)
- gcs_hook.return_value.download.return_value = b"id,name\r\n1,Anna"
-
- operator = GCSToBigQueryOperator(
- task_id=TASK_ID,
- bucket=TEST_BUCKET,
- source_objects=TEST_SOURCE_OBJECTS,
- destination_project_dataset_table=TEST_EXPLICIT_DEST,
- write_disposition=WRITE_DISPOSITION,
- schema_fields=SCHEMA_FIELDS,
- max_id_key=MAX_ID_KEY,
- external_table=False,
- autodetect=True,
- deferrable=True,
- )
-
- with pytest.raises(TaskDeferred):
- result = operator.execute(create_context(operator))
- assert result == "1"
+ with pytest.raises(TaskDeferred):
+ result = operator.execute(self.create_context(operator))
+ assert result == "1"
+
+ calls = [
+ call(
+ configuration={
+ "load": dict(
+ autodetect=True,
+ createDisposition="CREATE_IF_NEEDED",
+ destinationTable={
+ "projectId": PROJECT_ID,
+ "datasetId": DATASET,
+ "tableId": TABLE,
+ },
+ destinationTableProperties={
+ "description": None,
+ "labels": None,
+ },
+ sourceFormat="CSV",
+ skipLeadingRows=None,
+
sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"],
+ writeDisposition=WRITE_DISPOSITION,
+ ignoreUnknownValues=False,
+ allowQuotedNewlines=False,
+ encoding="UTF-8",
+ schema={"fields": SCHEMA_FIELDS},
+ ),
+ },
+ project_id=bq_hook.return_value.project_id,
+ location=None,
+ job_id=pytest.real_job_id,
+ timeout=None,
+ retry=DEFAULT_RETRY,
+ nowait=True,
+ ),
+ call(
+ configuration={
+ "query": {
+ "query": f"SELECT MAX({MAX_ID_KEY}) AS max_value
FROM {TEST_EXPLICIT_DEST}",
+ "useLegacySql": False,
+ "schemaUpdateOptions": [],
+ }
+ },
+ project_id=bq_hook.return_value.project_id,
+ ),
+ ]
- calls = [
- call(
- configuration={
- "load": dict(
- autodetect=True,
- createDisposition="CREATE_IF_NEEDED",
- destinationTable={"projectId": PROJECT_ID,
"datasetId": DATASET, "tableId": TABLE},
- destinationTableProperties={
- "description": None,
- "labels": None,
- },
- sourceFormat="CSV",
- skipLeadingRows=None,
-
sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"],
- writeDisposition=WRITE_DISPOSITION,
- ignoreUnknownValues=False,
- allowQuotedNewlines=False,
- encoding="UTF-8",
- ),
- },
- project_id=bq_hook.return_value.project_id,
- location=None,
- job_id=pytest.real_job_id,
- timeout=None,
- retry=DEFAULT_RETRY,
- nowait=True,
- ),
- call(
- configuration={
- "query": {
- "query": f"SELECT MAX({MAX_ID_KEY}) AS max_value FROM
{TEST_EXPLICIT_DEST}",
- "useLegacySql": False,
- "schemaUpdateOptions": [],
- }
- },
- project_id=bq_hook.return_value.project_id,
- ),
- ]
+ bq_hook.return_value.insert_job.assert_has_calls(calls)
- bq_hook.return_value.insert_job.assert_has_calls(calls)
+ @mock.patch(GCS_TO_BQ_PATH.format("GCSHook"))
+ @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+ def
test_schema_fields_int_without_external_table_async_should_execute_successfully(
+ self, bq_hook, gcs_hook
+ ):
+ bq_hook.return_value.insert_job.return_value = MagicMock(
+ job_id=pytest.real_job_id, error_result=False
+ )
+ bq_hook.return_value.generate_job_id.return_value = pytest.real_job_id
+ bq_hook.return_value.split_tablename.return_value = (PROJECT_ID,
DATASET, TABLE)
+ bq_hook.return_value.get_job.return_value.result.return_value = ("1",)
+ gcs_hook.return_value.download.return_value = b"id,name\r\n1,Anna"
+ operator = GCSToBigQueryOperator(
+ task_id=TASK_ID,
+ bucket=TEST_BUCKET,
+ source_objects=TEST_SOURCE_OBJECTS,
+ destination_project_dataset_table=TEST_EXPLICIT_DEST,
+ write_disposition=WRITE_DISPOSITION,
+ schema_fields=SCHEMA_FIELDS,
+ max_id_key=MAX_ID_KEY,
+ external_table=False,
+ autodetect=True,
+ deferrable=True,
+ )
-def create_context(task):
- dag = DAG(dag_id="dag")
- logical_date = datetime(2022, 1, 1, 0, 0, 0)
- dag_run = DagRun(
- dag_id=dag.dag_id,
- execution_date=logical_date,
- run_id=DagRun.generate_run_id(DagRunType.MANUAL, logical_date),
- )
- task_instance = TaskInstance(task=task)
- task_instance.dag_run = dag_run
- task_instance.dag_id = dag.dag_id
- task_instance.xcom_push = mock.Mock()
- return {
- "dag": dag,
- "run_id": dag_run.run_id,
- "task": task,
- "ti": task_instance,
- "task_instance": task_instance,
- "logical_date": logical_date,
- }
+ with pytest.raises(TaskDeferred):
+ result = operator.execute(self.create_context(operator))
+ assert result == "1"
+
+ calls = [
+ call(
+ configuration={
+ "load": dict(
+ autodetect=True,
+ createDisposition="CREATE_IF_NEEDED",
+ destinationTable={
+ "projectId": PROJECT_ID,
+ "datasetId": DATASET,
+ "tableId": TABLE,
+ },
+ destinationTableProperties={
+ "description": None,
+ "labels": None,
+ },
+ sourceFormat="CSV",
+ skipLeadingRows=None,
+
sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"],
+ writeDisposition=WRITE_DISPOSITION,
+ ignoreUnknownValues=False,
+ allowQuotedNewlines=False,
+ encoding="UTF-8",
+ ),
+ },
+ project_id=bq_hook.return_value.project_id,
+ location=None,
+ job_id=pytest.real_job_id,
+ timeout=None,
+ retry=DEFAULT_RETRY,
+ nowait=True,
+ ),
+ call(
+ configuration={
+ "query": {
+ "query": f"SELECT MAX({MAX_ID_KEY}) AS max_value
FROM {TEST_EXPLICIT_DEST}",
+ "useLegacySql": False,
+ "schemaUpdateOptions": [],
+ }
+ },
+ project_id=bq_hook.return_value.project_id,
+ ),
+ ]
+
+ bq_hook.return_value.insert_job.assert_has_calls(calls)
+
+ def create_context(self, task):
+ dag = DAG(dag_id="dag")
+ logical_date = datetime(2022, 1, 1, 0, 0, 0)
+ dag_run = DagRun(
+ dag_id=dag.dag_id,
+ execution_date=logical_date,
+ run_id=DagRun.generate_run_id(DagRunType.MANUAL, logical_date),
+ )
+ task_instance = TaskInstance(task=task)
+ task_instance.dag_run = dag_run
+ task_instance.dag_id = dag.dag_id
+ task_instance.xcom_push = mock.Mock()
+ return {
+ "dag": dag,
+ "run_id": dag_run.run_id,
+ "task": task,
+ "ti": task_instance,
+ "task_instance": task_instance,
+ "logical_date": logical_date,
+ }
diff --git
a/tests/system/providers/google/cloud/gcs/example_gcs_to_bigquery_async.py
b/tests/system/providers/google/cloud/gcs/example_gcs_to_bigquery_async.py
index ebfebab585..5888056456 100644
--- a/tests/system/providers/google/cloud/gcs/example_gcs_to_bigquery_async.py
+++ b/tests/system/providers/google/cloud/gcs/example_gcs_to_bigquery_async.py
@@ -37,8 +37,12 @@ DAG_ID = "gcs_to_bigquery_operator_async"
DATASET_NAME_STR = f"dataset_{DAG_ID}_{ENV_ID}_STR"
DATASET_NAME_DATE = f"dataset_{DAG_ID}_{ENV_ID}_DATE"
+DATASET_NAME_JSON = f"dataset_{DAG_ID}_{ENV_ID}_JSON"
+DATASET_NAME_DELIMITER = f"dataset_{DAG_ID}_{ENV_ID}_DELIMITER"
TABLE_NAME_STR = "test_str"
TABLE_NAME_DATE = "test_date"
+TABLE_NAME_JSON = "test_json"
+TABLE_NAME_DELIMITER = "test_delimiter"
MAX_ID_STR = "name"
MAX_ID_DATE = "date"
@@ -49,14 +53,24 @@ with models.DAG(
catchup=False,
tags=["example", "gcs"],
) as dag:
- create_test_dataset_for_string_fileds = BigQueryCreateEmptyDatasetOperator(
+ create_test_dataset_for_string_fields = BigQueryCreateEmptyDatasetOperator(
task_id="create_airflow_test_dataset_str",
dataset_id=DATASET_NAME_STR, project_id=PROJECT_ID
)
- create_test_dataset_for_date_fileds = BigQueryCreateEmptyDatasetOperator(
+ create_test_dataset_for_date_fields = BigQueryCreateEmptyDatasetOperator(
task_id="create_airflow_test_dataset_date",
dataset_id=DATASET_NAME_DATE, project_id=PROJECT_ID
)
+ create_test_dataset_for_json_fields = BigQueryCreateEmptyDatasetOperator(
+ task_id="create_airflow_test_dataset_json",
dataset_id=DATASET_NAME_JSON, project_id=PROJECT_ID
+ )
+
+ create_test_dataset_for_delimiter_fields =
BigQueryCreateEmptyDatasetOperator(
+ task_id="create_airflow_test_dataset_delimiter",
+ dataset_id=DATASET_NAME_DELIMITER,
+ project_id=PROJECT_ID,
+ )
+
# [START howto_operator_gcs_to_bigquery_async]
load_string_based_csv = GCSToBigQueryOperator(
task_id="gcs_to_bigquery_example_str_csv_async",
@@ -81,6 +95,34 @@ with models.DAG(
max_id_key=MAX_ID_DATE,
deferrable=True,
)
+
+ load_json = GCSToBigQueryOperator(
+ task_id="gcs_to_bigquery_example_date_json_async",
+ bucket="cloud-samples-data",
+ source_objects=["bigquery/us-states/us-states.json"],
+ source_format="NEWLINE_DELIMITED_JSON",
+
destination_project_dataset_table=f"{DATASET_NAME_JSON}.{TABLE_NAME_JSON}",
+ write_disposition="WRITE_TRUNCATE",
+ external_table=False,
+ autodetect=True,
+ max_id_key=MAX_ID_STR,
+ deferrable=True,
+ )
+
+ load_csv_delimiter = GCSToBigQueryOperator(
+ task_id="gcs_to_bigquery_example_delimiter_async",
+ bucket="big-query-samples",
+ source_objects=["employees-tabular.csv"],
+ source_format="csv",
+
destination_project_dataset_table=f"{DATASET_NAME_DELIMITER}.{TABLE_NAME_DELIMITER}",
+ write_disposition="WRITE_TRUNCATE",
+ external_table=False,
+ autodetect=True,
+ field_delimiter="\t",
+ quote_character="",
+ max_id_key=MAX_ID_STR,
+ deferrable=True,
+ )
# [END howto_operator_gcs_to_bigquery_async]
delete_test_dataset_str = BigQueryDeleteDatasetOperator(
@@ -97,16 +139,36 @@ with models.DAG(
trigger_rule=TriggerRule.ALL_DONE,
)
+ delete_test_dataset_json = BigQueryDeleteDatasetOperator(
+ task_id="delete_airflow_test_json_dataset",
+ dataset_id=DATASET_NAME_JSON,
+ delete_contents=True,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
+ delete_test_dataset_delimiter = BigQueryDeleteDatasetOperator(
+ task_id="delete_airflow_test_delimiter",
+ dataset_id=DATASET_NAME_JSON,
+ delete_contents=True,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
(
# TEST SETUP
- create_test_dataset_for_string_fileds
- >> create_test_dataset_for_date_fileds
+ create_test_dataset_for_string_fields
+ >> create_test_dataset_for_date_fields
+ >> create_test_dataset_for_json_fields
+ >> create_test_dataset_for_delimiter_fields
# TEST BODY
>> load_string_based_csv
>> load_date_based_csv
+ >> load_json
+ >> load_csv_delimiter
# TEST TEARDOWN
>> delete_test_dataset_str
>> delete_test_dataset_date
+ >> delete_test_dataset_json
+ >> delete_test_dataset_delimiter
)
from tests.system.utils.watcher import watcher