This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 23264fb820 Fix for issue with reading schema fields for JSON files in 
GCSToBigQueryOperator (#28284)
23264fb820 is described below

commit 23264fb820c179e9951ea9706f68b13a9b3fdbc0
Author: VladaZakharova <[email protected]>
AuthorDate: Wed Dec 21 20:39:12 2022 +0100

    Fix for issue with reading schema fields for JSON files in 
GCSToBigQueryOperator (#28284)
---
 .../google/cloud/transfers/gcs_to_bigquery.py      |  447 +++--
 .../providers/google/cloud/triggers/bigquery.py    |    2 +
 .../google/cloud/transfers/test_gcs_to_bigquery.py | 1719 ++++++++------------
 .../cloud/gcs/example_gcs_to_bigquery_async.py     |   70 +-
 4 files changed, 1026 insertions(+), 1212 deletions(-)

diff --git a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py 
b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
index 63c625be87..21b3e3d865 100644
--- a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
+++ b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
@@ -21,17 +21,22 @@ from __future__ import annotations
 import json
 from typing import TYPE_CHECKING, Any, Sequence
 
-from google.api_core.exceptions import Conflict
+from google.api_core.exceptions import BadRequest, Conflict
 from google.api_core.retry import Retry
-from google.cloud.bigquery import DEFAULT_RETRY, CopyJob, ExtractJob, LoadJob, 
QueryJob
+from google.cloud.bigquery import (
+    DEFAULT_RETRY,
+    CopyJob,
+    ExternalConfig,
+    ExtractJob,
+    LoadJob,
+    QueryJob,
+    SchemaField,
+)
+from google.cloud.bigquery.table import EncryptionConfiguration, Table, 
TableReference
 
 from airflow import AirflowException
 from airflow.models import BaseOperator
-from airflow.providers.google.cloud.hooks.bigquery import (
-    BigQueryHook,
-    BigQueryJob,
-    _cleanse_time_partitioning,
-)
+from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook, 
BigQueryJob
 from airflow.providers.google.cloud.hooks.gcs import GCSHook
 from airflow.providers.google.cloud.links.bigquery import BigQueryTableLink
 from airflow.providers.google.cloud.triggers.bigquery import 
BigQueryInsertJobTrigger
@@ -39,6 +44,15 @@ from airflow.providers.google.cloud.triggers.bigquery import 
BigQueryInsertJobTr
 if TYPE_CHECKING:
     from airflow.utils.context import Context
 
+ALLOWED_FORMATS = [
+    "CSV",
+    "NEWLINE_DELIMITED_JSON",
+    "AVRO",
+    "GOOGLE_SHEETS",
+    "DATASTORE_BACKUP",
+    "PARQUET",
+]
+
 
 class GCSToBigQueryOperator(BaseOperator):
     """
@@ -233,7 +247,13 @@ class GCSToBigQueryOperator(BaseOperator):
         # BQ config
         self.destination_project_dataset_table = 
destination_project_dataset_table
         self.schema_fields = schema_fields
-        self.source_format = source_format
+        if source_format.upper() not in ALLOWED_FORMATS:
+            raise ValueError(
+                f"{source_format} is not a valid source format. "
+                f"Please use one of the following types: {ALLOWED_FORMATS}."
+            )
+        else:
+            self.source_format = source_format.upper()
         self.compression = compression
         self.create_disposition = create_disposition
         self.skip_leading_rows = skip_leading_rows
@@ -300,6 +320,8 @@ class GCSToBigQueryOperator(BaseOperator):
             impersonation_chain=self.impersonation_chain,
         )
         self.hook = hook
+        self.source_format = self.source_format.upper()
+
         job_id = self.hook.generate_job_id(
             job_id=self.job_id,
             dag_id=self.dag_id,
@@ -312,113 +334,44 @@ class GCSToBigQueryOperator(BaseOperator):
         self.source_objects = (
             self.source_objects if isinstance(self.source_objects, list) else 
[self.source_objects]
         )
-        source_uris = [f"gs://{self.bucket}/{source_object}" for source_object 
in self.source_objects]
+        self.source_uris = [f"gs://{self.bucket}/{source_object}" for 
source_object in self.source_objects]
 
-        if not self.schema_fields and self.schema_object and 
self.source_format != "DATASTORE_BACKUP":
-            gcs_hook = GCSHook(
-                gcp_conn_id=self.gcp_conn_id,
-                delegate_to=self.delegate_to,
-                impersonation_chain=self.impersonation_chain,
-            )
-            self.schema_fields = json.loads(
-                gcs_hook.download(self.schema_object_bucket, 
self.schema_object).decode("utf-8")
-            )
-            self.log.info("Autodetected fields from schema object: %s", 
self.schema_fields)
+        if not self.schema_fields:
+            if not self.schema_object and not self.autodetect:
+                raise AirflowException(
+                    "Table schema was not found. Neither schema object nor 
schema fields were specified"
+                )
+            if self.schema_object and self.source_format != "DATASTORE_BACKUP":
+                gcs_hook = GCSHook(
+                    gcp_conn_id=self.gcp_conn_id,
+                    delegate_to=self.delegate_to,
+                    impersonation_chain=self.impersonation_chain,
+                )
+                self.schema_fields = json.loads(
+                    gcs_hook.download(self.schema_object_bucket, 
self.schema_object).decode("utf-8")
+                )
+                self.log.info("Loaded fields from schema object: %s", 
self.schema_fields)
+            else:
+                self.schema_fields = None
 
         if self.external_table:
             self.log.info("Creating a new BigQuery table for storing data...")
-            project_id, dataset_id, table_id = self.hook.split_tablename(
-                table_input=self.destination_project_dataset_table,
-                default_project_id=self.hook.project_id or "",
-            )
-            table_resource = {
-                "tableReference": {
-                    "projectId": project_id,
-                    "datasetId": dataset_id,
-                    "tableId": table_id,
-                },
-                "labels": self.labels,
-                "description": self.description,
-                "externalDataConfiguration": {
-                    "source_uris": source_uris,
-                    "source_format": self.source_format,
-                    "maxBadRecords": self.max_bad_records,
-                    "autodetect": self.autodetect,
-                    "compression": self.compression,
-                    "csvOptions": {
-                        "fieldDelimeter": self.field_delimiter,
-                        "skipLeadingRows": self.skip_leading_rows,
-                        "quote": self.quote_character,
-                        "allowQuotedNewlines": self.allow_quoted_newlines,
-                        "allowJaggedRows": self.allow_jagged_rows,
-                    },
-                },
-                "location": self.location,
-                "encryptionConfiguration": self.encryption_configuration,
-            }
-            table_resource_checked_schema = 
self._check_schema_fields(table_resource)
-            table = self.hook.create_empty_table(
-                table_resource=table_resource_checked_schema,
-            )
-            max_id = self._find_max_value_in_column()
+            table_obj_api_repr = self._create_empty_table()
+
             BigQueryTableLink.persist(
                 context=context,
                 task_instance=self,
-                dataset_id=table.to_api_repr()["tableReference"]["datasetId"],
-                project_id=table.to_api_repr()["tableReference"]["projectId"],
-                table_id=table.to_api_repr()["tableReference"]["tableId"],
+                dataset_id=table_obj_api_repr["tableReference"]["datasetId"],
+                project_id=table_obj_api_repr["tableReference"]["projectId"],
+                table_id=table_obj_api_repr["tableReference"]["tableId"],
             )
-            return max_id
+            if self.max_id_key:
+                max_id = self._find_max_value_in_column()
+                return max_id
         else:
             self.log.info("Using existing BigQuery table for storing data...")
-            destination_project, destination_dataset, destination_table = 
self.hook.split_tablename(
-                table_input=self.destination_project_dataset_table,
-                default_project_id=self.hook.project_id or "",
-                var_name="destination_project_dataset_table",
-            )
-            self.configuration = {
-                "load": {
-                    "autodetect": self.autodetect,
-                    "createDisposition": self.create_disposition,
-                    "destinationTable": {
-                        "projectId": destination_project,
-                        "datasetId": destination_dataset,
-                        "tableId": destination_table,
-                    },
-                    "destinationTableProperties": {
-                        "description": self.description,
-                        "labels": self.labels,
-                    },
-                    "sourceFormat": self.source_format,
-                    "skipLeadingRows": self.skip_leading_rows,
-                    "sourceUris": source_uris,
-                    "writeDisposition": self.write_disposition,
-                    "ignoreUnknownValues": self.ignore_unknown_values,
-                    "allowQuotedNewlines": self.allow_quoted_newlines,
-                    "encoding": self.encoding,
-                    "allowJaggedRows": self.allow_jagged_rows,
-                    "fieldDelimiter": self.field_delimiter,
-                    "maxBadRecords": self.max_bad_records,
-                    "quote": self.quote_character,
-                    "schemaUpdateOptions": self.schema_update_options,
-                },
-            }
-            if self.cluster_fields:
-                self.configuration["load"].update({"clustering": {"fields": 
self.cluster_fields}})
-            time_partitioning = _cleanse_time_partitioning(
-                self.destination_project_dataset_table, self.time_partitioning
-            )
-            if time_partitioning:
-                self.configuration["load"].update({"timePartitioning": 
time_partitioning})
-            # fields that should only be set if defined
-            set_if_def = {
-                "quote": self.quote_character,
-                "destinationEncryptionConfiguration": 
self.encryption_configuration,
-            }
-            for k, v in set_if_def.items():
-                if v:
-                    self.configuration["load"][k] = v
-            self.configuration = self._check_schema_fields(self.configuration)
+            self.configuration = self._use_existing_table()
+
             try:
                 self.log.info("Executing: %s", self.configuration)
                 job = self._submit_job(self.hook, job_id)
@@ -480,9 +433,9 @@ class GCSToBigQueryOperator(BaseOperator):
                 )
             else:
                 job.result(timeout=self.result_timeout, 
retry=self.result_retry)
-                max_id = self._find_max_value_in_column()
                 self._handle_job_error(job)
-                return max_id
+                if self.max_id_key:
+                    return self._find_max_value_in_column()
 
     def execute_complete(self, context: Context, event: dict[str, Any]):
         """
@@ -512,7 +465,6 @@ class GCSToBigQueryOperator(BaseOperator):
                 f"SELECT MAX({self.max_id_key}) AS max_value "
                 f"FROM {self.destination_project_dataset_table}"
             )
-
             self.configuration = {
                 "query": {
                     "query": select_command,
@@ -520,8 +472,17 @@ class GCSToBigQueryOperator(BaseOperator):
                     "schemaUpdateOptions": [],
                 }
             }
-            job_id = hook.insert_job(configuration=self.configuration, 
project_id=hook.project_id)
-            rows = list(hook.get_job(job_id=job_id, 
location=self.location).result())
+            try:
+                job_id = hook.insert_job(configuration=self.configuration, 
project_id=hook.project_id)
+                rows = list(hook.get_job(job_id=job_id, 
location=self.location).result())
+            except BadRequest as e:
+                if "Unrecognized name:" in e.message:
+                    raise AirflowException(
+                        f"Could not determine MAX value in column 
{self.max_id_key} "
+                        f"since the default value of 'string_field_n' was set 
by BQ"
+                    )
+                else:
+                    raise AirflowException(e.message)
             if rows:
                 for row in rows:
                     max_id = row[0] if row[0] else 0
@@ -535,53 +496,231 @@ class GCSToBigQueryOperator(BaseOperator):
             else:
                 raise RuntimeError(f"The {select_command} returned no rows!")
 
-    def _check_schema_fields(self, table_resource):
-        """
-        Helper method to detect schema fields if they were not specified by 
user and autodetect=True.
-        If source_objects were passed, method reads the second row in CSV 
file. If there is at least one digit
-        table_resurce is returned without changes so that BigQuery can 
determine schema_fields in the
-        next step.
-        If there are only characters, the first row with fields is used to 
construct schema_fields argument
-        with type 'STRING'. Table_resource is updated with new schema_fileds 
key and returned back to operator
-        :param table_resource: Configuration or table_resource dictionary
-        :return: table_resource: Updated table_resource dict with schema_fields
-        """
-        if not self.autodetect and not self.schema_fields:
-            raise RuntimeError(
-                "Table schema was not found. Set autodetect=True to "
-                "automatically set schema fields from source objects or pass "
-                "schema_fields explicitly"
+    def _create_empty_table(self):
+        project_id, dataset_id, table_id = self.hook.split_tablename(
+            table_input=self.destination_project_dataset_table,
+            default_project_id=self.hook.project_id or "",
+        )
+
+        external_config_api_repr = {
+            "autodetect": self.autodetect,
+            "sourceFormat": self.source_format,
+            "sourceUris": self.source_uris,
+            "compression": self.compression.upper(),
+            "ignoreUnknownValues": self.ignore_unknown_values,
+        }
+        # if following fields are not specified in src_fmt_configs,
+        # honor the top-level params for backward-compatibility
+        backward_compatibility_configs = {
+            "skipLeadingRows": self.skip_leading_rows,
+            "fieldDelimiter": self.field_delimiter,
+            "quote": self.quote_character,
+            "allowQuotedNewlines": self.allow_quoted_newlines,
+            "allowJaggedRows": self.allow_jagged_rows,
+            "encoding": self.encoding,
+        }
+        src_fmt_to_param_mapping = {"CSV": "csvOptions", "GOOGLE_SHEETS": 
"googleSheetsOptions"}
+        src_fmt_to_configs_mapping = {
+            "csvOptions": [
+                "allowJaggedRows",
+                "allowQuotedNewlines",
+                "fieldDelimiter",
+                "skipLeadingRows",
+                "quote",
+                "encoding",
+            ],
+            "googleSheetsOptions": ["skipLeadingRows"],
+        }
+        if self.source_format in src_fmt_to_param_mapping.keys():
+            valid_configs = 
src_fmt_to_configs_mapping[src_fmt_to_param_mapping[self.source_format]]
+            self.src_fmt_configs = self._validate_src_fmt_configs(
+                self.source_format, self.src_fmt_configs, valid_configs, 
backward_compatibility_configs
             )
-        elif not self.schema_fields:
-            for source_object in self.source_objects:
-                gcs_hook = GCSHook(
-                    gcp_conn_id=self.gcp_conn_id,
-                    delegate_to=self.delegate_to,
-                    impersonation_chain=self.impersonation_chain,
-                )
-                blob = gcs_hook.download(
-                    bucket_name=self.schema_object_bucket,
-                    object_name=source_object,
+            
external_config_api_repr[src_fmt_to_param_mapping[self.source_format]] = 
self.src_fmt_configs
+
+        external_config = 
ExternalConfig.from_api_repr(external_config_api_repr)
+        if self.schema_fields:
+            external_config.schema = [SchemaField.from_api_repr(f) for f in 
self.schema_fields]
+        if self.max_bad_records:
+            external_config.max_bad_records = self.max_bad_records
+
+        # build table definition
+        table = Table(
+            
table_ref=TableReference.from_string(self.destination_project_dataset_table, 
project_id)
+        )
+        table.external_data_configuration = external_config
+        if self.labels:
+            table.labels = self.labels
+
+        if self.description:
+            table.description = self.description
+
+        if self.encryption_configuration:
+            table.encryption_configuration = 
EncryptionConfiguration.from_api_repr(
+                self.encryption_configuration
+            )
+        table_obj_api_repr = table.to_api_repr()
+
+        self.log.info("Creating external table: %s", 
self.destination_project_dataset_table)
+        self.hook.create_empty_table(
+            table_resource=table_obj_api_repr, project_id=project_id, 
location=self.location, exists_ok=True
+        )
+        self.log.info("External table created successfully: %s", 
self.destination_project_dataset_table)
+        return table_obj_api_repr
+
+    def _use_existing_table(self):
+        destination_project, destination_dataset, destination_table = 
self.hook.split_tablename(
+            table_input=self.destination_project_dataset_table,
+            default_project_id=self.hook.project_id or "",
+            var_name="destination_project_dataset_table",
+        )
+
+        # bigquery also allows you to define how you want a table's schema to 
change
+        # as a side effect of a load
+        # for more details:
+        # 
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schemaUpdateOptions
+        allowed_schema_update_options = ["ALLOW_FIELD_ADDITION", 
"ALLOW_FIELD_RELAXATION"]
+        if not 
set(allowed_schema_update_options).issuperset(set(self.schema_update_options)):
+            raise ValueError(
+                f"{self.schema_update_options} contains invalid schema update 
options. "
+                f"Please only use one or more of the following options: 
{allowed_schema_update_options}"
+            )
+
+        self.configuration = {
+            "load": {
+                "autodetect": self.autodetect,
+                "createDisposition": self.create_disposition,
+                "destinationTable": {
+                    "projectId": destination_project,
+                    "datasetId": destination_dataset,
+                    "tableId": destination_table,
+                },
+                "sourceFormat": self.source_format,
+                "sourceUris": self.source_uris,
+                "writeDisposition": self.write_disposition,
+                "ignoreUnknownValues": self.ignore_unknown_values,
+            },
+        }
+        self.time_partitioning = self._cleanse_time_partitioning(
+            self.destination_project_dataset_table, self.time_partitioning
+        )
+        if self.time_partitioning:
+            self.configuration["load"].update({"timePartitioning": 
self.time_partitioning})
+
+        if self.cluster_fields:
+            self.configuration["load"].update({"clustering": {"fields": 
self.cluster_fields}})
+
+        if self.schema_fields:
+            self.configuration["load"]["schema"] = {"fields": 
self.schema_fields}
+
+        if self.schema_update_options:
+            if self.write_disposition not in ["WRITE_APPEND", 
"WRITE_TRUNCATE"]:
+                raise ValueError(
+                    "schema_update_options is only "
+                    "allowed if write_disposition is "
+                    "'WRITE_APPEND' or 'WRITE_TRUNCATE'."
                 )
-                fields, values = [item.split(",") for item in 
blob.decode("utf-8").splitlines()][:2]
-                import re
+            else:
+                # To provide backward compatibility
+                self.schema_update_options = list(self.schema_update_options 
or [])
+                self.log.info("Adding experimental 'schemaUpdateOptions': %s", 
self.schema_update_options)
+                self.configuration["load"]["schemaUpdateOptions"] = 
self.schema_update_options
+
+        if self.max_bad_records:
+            self.configuration["load"]["maxBadRecords"] = self.max_bad_records
+
+        if self.encryption_configuration:
+            self.configuration["load"]["destinationEncryptionConfiguration"] = 
self.encryption_configuration
+
+        if self.labels or self.description:
+            self.configuration["load"].update({"destinationTableProperties": 
{}})
+            if self.labels:
+                
self.configuration["load"]["destinationTableProperties"]["labels"] = self.labels
+            if self.description:
+                
self.configuration["load"]["destinationTableProperties"]["description"] = 
self.description
+
+        src_fmt_to_configs_mapping = {
+            "CSV": [
+                "allowJaggedRows",
+                "allowQuotedNewlines",
+                "autodetect",
+                "fieldDelimiter",
+                "skipLeadingRows",
+                "ignoreUnknownValues",
+                "nullMarker",
+                "quote",
+                "encoding",
+            ],
+            "DATASTORE_BACKUP": ["projectionFields"],
+            "NEWLINE_DELIMITED_JSON": ["autodetect", "ignoreUnknownValues"],
+            "PARQUET": ["autodetect", "ignoreUnknownValues"],
+            "AVRO": ["useAvroLogicalTypes"],
+        }
+
+        valid_configs = src_fmt_to_configs_mapping[self.source_format]
+
+        # if following fields are not specified in src_fmt_configs,
+        # honor the top-level params for backward-compatibility
+        backward_compatibility_configs = {
+            "skipLeadingRows": self.skip_leading_rows,
+            "fieldDelimiter": self.field_delimiter,
+            "ignoreUnknownValues": self.ignore_unknown_values,
+            "quote": self.quote_character,
+            "allowQuotedNewlines": self.allow_quoted_newlines,
+            "encoding": self.encoding,
+        }
+
+        self.src_fmt_configs = self._validate_src_fmt_configs(
+            self.source_format, self.src_fmt_configs, valid_configs, 
backward_compatibility_configs
+        )
 
-                if any(re.match(r"[\d\-\\.]+$", value) for value in values):
-                    return table_resource
-                else:
-                    schema_fields = []
-                    for field in fields:
-                        schema_fields.append({"name": field, "type": "STRING", 
"mode": "NULLABLE"})
-                    self.schema_fields = schema_fields
-                    if self.external_table:
-                        
table_resource["externalDataConfiguration"]["csvOptions"]["skipLeadingRows"] = 1
-                    elif not self.external_table:
-                        table_resource["load"]["skipLeadingRows"] = 1
-        if self.external_table:
-            table_resource["schema"] = {"fields": self.schema_fields}
-        elif not self.external_table:
-            table_resource["load"]["schema"] = {"fields": self.schema_fields}
-        return table_resource
+        self.configuration["load"].update(self.src_fmt_configs)
+
+        if self.allow_jagged_rows:
+            self.configuration["load"]["allowJaggedRows"] = 
self.allow_jagged_rows
+        return self.configuration
+
+    def _validate_src_fmt_configs(
+        self,
+        source_format: str,
+        src_fmt_configs: dict,
+        valid_configs: list[str],
+        backward_compatibility_configs: dict | None = None,
+    ) -> dict:
+        """
+        Validates the given src_fmt_configs against a valid configuration for 
the source format.
+        Adds the backward compatibility config to the src_fmt_configs.
+
+        :param source_format: File format to export.
+        :param src_fmt_configs: Configure optional fields specific to the 
source format.
+        :param valid_configs: Valid configuration specific to the source format
+        :param backward_compatibility_configs: The top-level params for 
backward-compatibility
+        """
+        if backward_compatibility_configs is None:
+            backward_compatibility_configs = {}
+
+        for k, v in backward_compatibility_configs.items():
+            if k not in src_fmt_configs and k in valid_configs:
+                src_fmt_configs[k] = v
+
+        for k, v in src_fmt_configs.items():
+            if k not in valid_configs:
+                raise ValueError(f"{k} is not a valid src_fmt_configs for type 
{source_format}.")
+
+        return src_fmt_configs
+
+    def _cleanse_time_partitioning(
+        self, destination_dataset_table: str | None, time_partitioning_in: 
dict | None
+    ) -> dict:  # if it is a partitioned table ($ is in the table name) add 
partition load option
+
+        if time_partitioning_in is None:
+            time_partitioning_in = {}
+
+        time_partitioning_out = {}
+        if destination_dataset_table and "$" in destination_dataset_table:
+            time_partitioning_out["type"] = "DAY"
+        time_partitioning_out.update(time_partitioning_in)
+        return time_partitioning_out
 
     def on_kill(self) -> None:
         if self.job_id and self.cancel_on_kill:
diff --git a/airflow/providers/google/cloud/triggers/bigquery.py 
b/airflow/providers/google/cloud/triggers/bigquery.py
index 271b02e3fc..9dd780d24a 100644
--- a/airflow/providers/google/cloud/triggers/bigquery.py
+++ b/airflow/providers/google/cloud/triggers/bigquery.py
@@ -88,12 +88,14 @@ class BigQueryInsertJobTrigger(BaseTrigger):
                             "message": "Job completed",
                         }
                     )
+                    return
                 elif response_from_hook == "pending":
                     self.log.info("Query is still running...")
                     self.log.info("Sleeping for %s seconds.", 
self.poll_interval)
                     await asyncio.sleep(self.poll_interval)
                 else:
                     yield TriggerEvent({"status": "error", "message": 
response_from_hook})
+                    return
 
             except Exception as e:
                 self.log.exception("Exception occurred while checking for 
query completion")
diff --git a/tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py 
b/tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py
index 1a9356134c..5340448083 100644
--- a/tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py
+++ b/tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py
@@ -54,8 +54,9 @@ SCHEMA_FIELDS_INT = [
 ]
 SCHEMA_BUCKET = "test-schema-bucket"
 SCHEMA_OBJECT = "test/schema/schema.json"
-TEST_SOURCE_OBJECTS = ["test/objects/test.csv"]
-TEST_SOURCE_OBJECTS_AS_STRING = "test/objects/test.csv"
+TEST_SOURCE_OBJECTS_LIST = ["test/objects/test.csv"]
+TEST_SOURCE_OBJECTS = "test/objects/test.csv"
+TEST_SOURCE_OBJECTS_JSON = "test/objects/test.json"
 LABELS = {"k1": "v1"}
 DESCRIPTION = "Test Description"
 
@@ -63,9 +64,11 @@ job_id = "123456"
 hash_ = "hash"
 pytest.real_job_id = f"{job_id}_{hash_}"
 
+GCS_TO_BQ_PATH = "airflow.providers.google.cloud.transfers.gcs_to_bigquery.{}"
+
 
 class TestGCSToBigQueryOperator(unittest.TestCase):
-    
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
+    @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
     def test_max_value_external_table_should_execute_successfully(self, hook):
         hook.return_value.insert_job.side_effect = [
             MagicMock(job_id=pytest.real_job_id, error_result=False),
@@ -89,28 +92,29 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
 
         assert result == "1"
         hook.return_value.create_empty_table.assert_called_once_with(
+            exists_ok=True,
+            location=None,
+            project_id=PROJECT_ID,
             table_resource={
                 "tableReference": {"projectId": PROJECT_ID, "datasetId": 
DATASET, "tableId": TABLE},
-                "labels": None,
-                "description": None,
+                "labels": {},
                 "externalDataConfiguration": {
-                    "source_uris": 
[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"],
-                    "source_format": "CSV",
-                    "maxBadRecords": 0,
                     "autodetect": True,
+                    "sourceFormat": "CSV",
+                    "sourceUris": 
[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"],
                     "compression": "NONE",
+                    "ignoreUnknownValues": False,
                     "csvOptions": {
-                        "fieldDelimeter": ",",
                         "skipLeadingRows": None,
+                        "fieldDelimiter": ",",
                         "quote": None,
                         "allowQuotedNewlines": False,
                         "allowJaggedRows": False,
+                        "encoding": "UTF-8",
                     },
+                    "schema": {"fields": SCHEMA_FIELDS},
                 },
-                "location": None,
-                "encryptionConfiguration": None,
-                "schema": {"fields": SCHEMA_FIELDS},
-            }
+            },
         )
         hook.return_value.insert_job.assert_called_once_with(
             configuration={
@@ -123,7 +127,7 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
             project_id=hook.return_value.project_id,
         )
 
-    
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
+    @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
     def 
test_max_value_without_external_table_should_execute_successfully(self, hook):
         hook.return_value.insert_job.side_effect = [
             MagicMock(job_id=pytest.real_job_id, error_result=False),
@@ -150,35 +154,28 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
         calls = [
             call(
                 configuration={
-                    "load": dict(
-                        autodetect=True,
-                        createDisposition="CREATE_IF_NEEDED",
-                        destinationTable={"projectId": PROJECT_ID, 
"datasetId": DATASET, "tableId": TABLE},
-                        destinationTableProperties={
-                            "description": None,
-                            "labels": None,
-                        },
-                        sourceFormat="CSV",
-                        skipLeadingRows=None,
-                        
sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"],
-                        writeDisposition=WRITE_DISPOSITION,
-                        ignoreUnknownValues=False,
-                        allowQuotedNewlines=False,
-                        encoding="UTF-8",
-                        schema={"fields": SCHEMA_FIELDS},
-                        allowJaggedRows=False,
-                        fieldDelimiter=",",
-                        maxBadRecords=0,
-                        quote=None,
-                        schemaUpdateOptions=(),
-                    ),
+                    "load": {
+                        "autodetect": True,
+                        "createDisposition": "CREATE_IF_NEEDED",
+                        "destinationTable": {"projectId": PROJECT_ID, 
"datasetId": DATASET, "tableId": TABLE},
+                        "sourceFormat": "CSV",
+                        "sourceUris": 
[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"],
+                        "writeDisposition": WRITE_DISPOSITION,
+                        "ignoreUnknownValues": False,
+                        "schema": {"fields": SCHEMA_FIELDS},
+                        "skipLeadingRows": None,
+                        "fieldDelimiter": ",",
+                        "quote": None,
+                        "allowQuotedNewlines": False,
+                        "encoding": "UTF-8",
+                    }
                 },
-                project_id=hook.return_value.project_id,
-                location=None,
                 job_id=pytest.real_job_id,
-                timeout=None,
-                retry=DEFAULT_RETRY,
+                location=None,
                 nowait=True,
+                project_id=hook.return_value.project_id,
+                retry=DEFAULT_RETRY,
+                timeout=None,
             ),
             call(
                 configuration={
@@ -194,7 +191,7 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
 
         hook.return_value.insert_job.assert_has_calls(calls)
 
-    
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
+    @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
     def test_max_value_should_throw_ex_when_query_returns_no_rows(self, hook):
         hook.return_value.insert_job.side_effect = [
             MagicMock(job_id=pytest.real_job_id, error_result=False),
@@ -218,35 +215,28 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
         calls = [
             call(
                 configuration={
-                    "load": dict(
-                        autodetect=True,
-                        createDisposition="CREATE_IF_NEEDED",
-                        destinationTable={"projectId": PROJECT_ID, 
"datasetId": DATASET, "tableId": TABLE},
-                        destinationTableProperties={
-                            "description": None,
-                            "labels": None,
-                        },
-                        sourceFormat="CSV",
-                        skipLeadingRows=None,
-                        
sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"],
-                        writeDisposition=WRITE_DISPOSITION,
-                        ignoreUnknownValues=False,
-                        allowQuotedNewlines=False,
-                        encoding="UTF-8",
-                        schema={"fields": SCHEMA_FIELDS},
-                        allowJaggedRows=False,
-                        fieldDelimiter=",",
-                        maxBadRecords=0,
-                        quote=None,
-                        schemaUpdateOptions=(),
-                    ),
+                    "load": {
+                        "autodetect": True,
+                        "createDisposition": "CREATE_IF_NEEDED",
+                        "destinationTable": {"projectId": PROJECT_ID, 
"datasetId": DATASET, "tableId": TABLE},
+                        "sourceFormat": "CSV",
+                        "sourceUris": 
[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"],
+                        "writeDisposition": "WRITE_TRUNCATE",
+                        "ignoreUnknownValues": False,
+                        "schema": {"fields": SCHEMA_FIELDS},
+                        "skipLeadingRows": None,
+                        "fieldDelimiter": ",",
+                        "quote": None,
+                        "allowQuotedNewlines": False,
+                        "encoding": "UTF-8",
+                    }
                 },
-                project_id=hook.return_value.project_id,
-                location=None,
                 job_id=pytest.real_job_id,
-                timeout=None,
-                retry=DEFAULT_RETRY,
+                location=None,
                 nowait=True,
+                project_id=hook.return_value.project_id,
+                retry=DEFAULT_RETRY,
+                timeout=None,
             ),
             call(
                 configuration={
@@ -262,7 +252,7 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
 
         hook.return_value.insert_job.assert_has_calls(calls)
 
-    
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
+    @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
     def test_labels_external_table_should_execute_successfully(self, hook):
         hook.return_value.insert_job.side_effect = [
             MagicMock(job_id=pytest.real_job_id, error_result=False),
@@ -284,31 +274,32 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
 
         operator.execute(context=MagicMock())
         hook.return_value.create_empty_table.assert_called_once_with(
+            exists_ok=True,
+            location=None,
+            project_id=PROJECT_ID,
             table_resource={
                 "tableReference": {"projectId": PROJECT_ID, "datasetId": 
DATASET, "tableId": TABLE},
                 "labels": LABELS,
-                "description": None,
                 "externalDataConfiguration": {
-                    "source_uris": 
[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"],
-                    "source_format": "CSV",
-                    "maxBadRecords": 0,
                     "autodetect": True,
+                    "sourceFormat": "CSV",
+                    "sourceUris": 
[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"],
                     "compression": "NONE",
+                    "ignoreUnknownValues": False,
                     "csvOptions": {
-                        "fieldDelimeter": ",",
                         "skipLeadingRows": None,
+                        "fieldDelimiter": ",",
                         "quote": None,
                         "allowQuotedNewlines": False,
                         "allowJaggedRows": False,
+                        "encoding": "UTF-8",
                     },
+                    "schema": {"fields": SCHEMA_FIELDS},
                 },
-                "location": None,
-                "encryptionConfiguration": None,
-                "schema": {"fields": SCHEMA_FIELDS},
-            }
+            },
         )
 
-    
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
+    @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
     def test_labels_without_external_table_should_execute_successfully(self, 
hook):
         hook.return_value.insert_job.side_effect = [
             MagicMock(job_id=pytest.real_job_id, error_result=False),
@@ -332,41 +323,35 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
         calls = [
             call(
                 configuration={
-                    "load": dict(
-                        autodetect=True,
-                        createDisposition="CREATE_IF_NEEDED",
-                        destinationTable={"projectId": PROJECT_ID, 
"datasetId": DATASET, "tableId": TABLE},
-                        destinationTableProperties={
-                            "description": None,
-                            "labels": LABELS,
-                        },
-                        sourceFormat="CSV",
-                        skipLeadingRows=None,
-                        
sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"],
-                        writeDisposition=WRITE_DISPOSITION,
-                        ignoreUnknownValues=False,
-                        allowQuotedNewlines=False,
-                        encoding="UTF-8",
-                        schema={"fields": SCHEMA_FIELDS},
-                        allowJaggedRows=False,
-                        fieldDelimiter=",",
-                        maxBadRecords=0,
-                        quote=None,
-                        schemaUpdateOptions=(),
-                    ),
+                    "load": {
+                        "autodetect": True,
+                        "createDisposition": "CREATE_IF_NEEDED",
+                        "destinationTable": {"projectId": PROJECT_ID, 
"datasetId": DATASET, "tableId": TABLE},
+                        "sourceFormat": "CSV",
+                        "sourceUris": 
[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"],
+                        "writeDisposition": "WRITE_TRUNCATE",
+                        "ignoreUnknownValues": False,
+                        "schema": {"fields": SCHEMA_FIELDS},
+                        "destinationTableProperties": {"labels": LABELS},
+                        "skipLeadingRows": None,
+                        "fieldDelimiter": ",",
+                        "quote": None,
+                        "allowQuotedNewlines": False,
+                        "encoding": "UTF-8",
+                    }
                 },
-                project_id=hook.return_value.project_id,
-                location=None,
                 job_id=pytest.real_job_id,
-                timeout=None,
-                retry=DEFAULT_RETRY,
+                location=None,
                 nowait=True,
-            ),
+                project_id=hook.return_value.project_id,
+                retry=DEFAULT_RETRY,
+                timeout=None,
+            )
         ]
 
         hook.return_value.insert_job.assert_has_calls(calls)
 
-    
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
+    @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
     def test_description_external_table_should_execute_successfully(self, 
hook):
         hook.return_value.insert_job.side_effect = [
             MagicMock(job_id=pytest.real_job_id, error_result=False),
@@ -388,31 +373,33 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
 
         operator.execute(context=MagicMock())
         hook.return_value.create_empty_table.assert_called_once_with(
+            exists_ok=True,
+            location=None,
+            project_id=PROJECT_ID,
             table_resource={
                 "tableReference": {"projectId": PROJECT_ID, "datasetId": 
DATASET, "tableId": TABLE},
-                "labels": None,
-                "description": DESCRIPTION,
+                "labels": {},
                 "externalDataConfiguration": {
-                    "source_uris": 
[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"],
-                    "source_format": "CSV",
-                    "maxBadRecords": 0,
                     "autodetect": True,
+                    "sourceFormat": "CSV",
+                    "sourceUris": 
[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"],
                     "compression": "NONE",
+                    "ignoreUnknownValues": False,
                     "csvOptions": {
-                        "fieldDelimeter": ",",
                         "skipLeadingRows": None,
+                        "fieldDelimiter": ",",
                         "quote": None,
                         "allowQuotedNewlines": False,
                         "allowJaggedRows": False,
+                        "encoding": "UTF-8",
                     },
+                    "schema": {"fields": SCHEMA_FIELDS},
                 },
-                "location": None,
-                "encryptionConfiguration": None,
-                "schema": {"fields": SCHEMA_FIELDS},
-            }
+                "description": DESCRIPTION,
+            },
         )
 
-    
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
+    @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
     def 
test_description_without_external_table_should_execute_successfully(self, hook):
         hook.return_value.insert_job.side_effect = [
             MagicMock(job_id=pytest.real_job_id, error_result=False),
@@ -442,21 +429,17 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
                         destinationTable={"projectId": PROJECT_ID, 
"datasetId": DATASET, "tableId": TABLE},
                         destinationTableProperties={
                             "description": DESCRIPTION,
-                            "labels": None,
                         },
                         sourceFormat="CSV",
                         skipLeadingRows=None,
-                        
sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"],
+                        
sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"],
                         writeDisposition=WRITE_DISPOSITION,
                         ignoreUnknownValues=False,
                         allowQuotedNewlines=False,
                         encoding="UTF-8",
                         schema={"fields": SCHEMA_FIELDS},
-                        allowJaggedRows=False,
-                        fieldDelimiter=",",
-                        maxBadRecords=0,
                         quote=None,
-                        schemaUpdateOptions=(),
+                        fieldDelimiter=",",
                     ),
                 },
                 project_id=hook.return_value.project_id,
@@ -467,10 +450,9 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
                 nowait=True,
             ),
         ]
-
         hook.return_value.insert_job.assert_has_calls(calls)
 
-    
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
+    @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
     def 
test_source_objs_as_list_external_table_should_execute_successfully(self, hook):
         hook.return_value.insert_job.side_effect = [
             MagicMock(job_id=pytest.real_job_id, error_result=False),
@@ -491,33 +473,34 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
         operator.execute(context=MagicMock())
 
         hook.return_value.create_empty_table.assert_called_once_with(
+            exists_ok=True,
+            location=None,
+            project_id=PROJECT_ID,
             table_resource={
                 "tableReference": {"projectId": PROJECT_ID, "datasetId": 
DATASET, "tableId": TABLE},
-                "labels": None,
-                "description": None,
+                "labels": {},
                 "externalDataConfiguration": {
-                    "source_uris": [
-                        f"gs://{TEST_BUCKET}/{source_object}" for 
source_object in TEST_SOURCE_OBJECTS
-                    ],
-                    "source_format": "CSV",
-                    "maxBadRecords": 0,
                     "autodetect": True,
+                    "sourceFormat": "CSV",
+                    "sourceUris": [
+                        f"gs://{TEST_BUCKET}/{source_object}" for 
source_object in TEST_SOURCE_OBJECTS_LIST
+                    ],
                     "compression": "NONE",
+                    "ignoreUnknownValues": False,
                     "csvOptions": {
-                        "fieldDelimeter": ",",
                         "skipLeadingRows": None,
+                        "fieldDelimiter": ",",
                         "quote": None,
                         "allowQuotedNewlines": False,
                         "allowJaggedRows": False,
+                        "encoding": "UTF-8",
                     },
+                    "schema": {"fields": SCHEMA_FIELDS},
                 },
-                "location": None,
-                "encryptionConfiguration": None,
-                "schema": {"fields": SCHEMA_FIELDS},
-            }
+            },
         )
 
-    
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
+    @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
     def 
test_source_objs_as_list_without_external_table_should_execute_successfully(self,
 hook):
         hook.return_value.insert_job.side_effect = [
             MagicMock(job_id=pytest.real_job_id, error_result=False),
@@ -540,43 +523,38 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
         calls = [
             call(
                 configuration={
-                    "load": dict(
-                        autodetect=True,
-                        createDisposition="CREATE_IF_NEEDED",
-                        destinationTable={"projectId": PROJECT_ID, 
"datasetId": DATASET, "tableId": TABLE},
-                        destinationTableProperties={
-                            "description": None,
-                            "labels": None,
+                    "load": {
+                        "autodetect": True,
+                        "createDisposition": "CREATE_IF_NEEDED",
+                        "destinationTable": {
+                            "projectId": "test-project",
+                            "datasetId": "dataset",
+                            "tableId": "table",
                         },
-                        sourceFormat="CSV",
-                        skipLeadingRows=None,
-                        sourceUris=[
-                            f"gs://{TEST_BUCKET}/{source_object}" for 
source_object in TEST_SOURCE_OBJECTS
-                        ],
-                        writeDisposition=WRITE_DISPOSITION,
-                        ignoreUnknownValues=False,
-                        allowQuotedNewlines=False,
-                        encoding="UTF-8",
-                        schema={"fields": SCHEMA_FIELDS},
-                        allowJaggedRows=False,
-                        fieldDelimiter=",",
-                        maxBadRecords=0,
-                        quote=None,
-                        schemaUpdateOptions=(),
-                    ),
+                        "sourceFormat": "CSV",
+                        "sourceUris": 
["gs://test-bucket/test/objects/test.csv"],
+                        "writeDisposition": "WRITE_TRUNCATE",
+                        "ignoreUnknownValues": False,
+                        "schema": {"fields": SCHEMA_FIELDS},
+                        "skipLeadingRows": None,
+                        "fieldDelimiter": ",",
+                        "quote": None,
+                        "allowQuotedNewlines": False,
+                        "encoding": "UTF-8",
+                    }
                 },
-                project_id=hook.return_value.project_id,
-                location=None,
                 job_id=pytest.real_job_id,
-                timeout=None,
-                retry=DEFAULT_RETRY,
+                location=None,
                 nowait=True,
-            ),
+                project_id=hook.return_value.project_id,
+                retry=DEFAULT_RETRY,
+                timeout=None,
+            )
         ]
 
         hook.return_value.insert_job.assert_has_calls(calls)
 
-    
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
+    @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
     def 
test_source_objs_as_string_external_table_should_execute_successfully(self, 
hook):
         hook.return_value.insert_job.side_effect = [
             MagicMock(job_id=pytest.real_job_id, error_result=False),
@@ -597,31 +575,32 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
         operator.execute(context=MagicMock())
 
         hook.return_value.create_empty_table.assert_called_once_with(
+            exists_ok=True,
+            location=None,
+            project_id=PROJECT_ID,
             table_resource={
                 "tableReference": {"projectId": PROJECT_ID, "datasetId": 
DATASET, "tableId": TABLE},
-                "labels": None,
-                "description": None,
+                "labels": {},
                 "externalDataConfiguration": {
-                    "source_uris": 
[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"],
-                    "source_format": "CSV",
-                    "maxBadRecords": 0,
                     "autodetect": True,
+                    "sourceFormat": "CSV",
+                    "sourceUris": 
[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"],
                     "compression": "NONE",
+                    "ignoreUnknownValues": False,
                     "csvOptions": {
-                        "fieldDelimeter": ",",
                         "skipLeadingRows": None,
+                        "fieldDelimiter": ",",
                         "quote": None,
                         "allowQuotedNewlines": False,
                         "allowJaggedRows": False,
+                        "encoding": "UTF-8",
                     },
+                    "schema": {"fields": SCHEMA_FIELDS},
                 },
-                "location": None,
-                "encryptionConfiguration": None,
-                "schema": {"fields": SCHEMA_FIELDS},
-            }
+            },
         )
 
-    
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
+    @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
     def 
test_source_objs_as_string_without_external_table_should_execute_successfully(self,
 hook):
         hook.return_value.insert_job.side_effect = [
             MagicMock(job_id=pytest.real_job_id, error_result=False),
@@ -644,42 +623,39 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
         calls = [
             call(
                 configuration={
-                    "load": dict(
-                        autodetect=True,
-                        createDisposition="CREATE_IF_NEEDED",
-                        destinationTable={"projectId": PROJECT_ID, 
"datasetId": DATASET, "tableId": TABLE},
-                        destinationTableProperties={
-                            "description": None,
-                            "labels": None,
+                    "load": {
+                        "autodetect": True,
+                        "createDisposition": "CREATE_IF_NEEDED",
+                        "destinationTable": {
+                            "projectId": PROJECT_ID,
+                            "datasetId": DATASET,
+                            "tableId": "table",
                         },
-                        sourceFormat="CSV",
-                        skipLeadingRows=None,
-                        
sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"],
-                        writeDisposition=WRITE_DISPOSITION,
-                        ignoreUnknownValues=False,
-                        allowQuotedNewlines=False,
-                        encoding="UTF-8",
-                        schema={"fields": SCHEMA_FIELDS},
-                        allowJaggedRows=False,
-                        fieldDelimiter=",",
-                        maxBadRecords=0,
-                        quote=None,
-                        schemaUpdateOptions=(),
-                    ),
+                        "sourceFormat": "CSV",
+                        "sourceUris": 
[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"],
+                        "writeDisposition": "WRITE_TRUNCATE",
+                        "ignoreUnknownValues": False,
+                        "schema": {"fields": SCHEMA_FIELDS},
+                        "skipLeadingRows": None,
+                        "fieldDelimiter": ",",
+                        "quote": None,
+                        "allowQuotedNewlines": False,
+                        "encoding": "UTF-8",
+                    }
                 },
-                project_id=hook.return_value.project_id,
-                location=None,
                 job_id=pytest.real_job_id,
-                timeout=None,
-                retry=DEFAULT_RETRY,
+                location=None,
                 nowait=True,
-            ),
+                project_id=hook.return_value.project_id,
+                retry=DEFAULT_RETRY,
+                timeout=None,
+            )
         ]
 
         hook.return_value.insert_job.assert_has_calls(calls)
 
-    
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSHook")
-    
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
+    @mock.patch(GCS_TO_BQ_PATH.format("GCSHook"))
+    @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
     def test_schema_obj_external_table_should_execute_successfully(self, 
bq_hook, gcs_hook):
         bq_hook.return_value.insert_job.side_effect = [
             MagicMock(job_id=pytest.real_job_id, error_result=False),
@@ -702,33 +678,34 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
         operator.execute(context=MagicMock())
 
         bq_hook.return_value.create_empty_table.assert_called_once_with(
+            exists_ok=True,
+            location=None,
+            project_id=PROJECT_ID,
             table_resource={
                 "tableReference": {"projectId": PROJECT_ID, "datasetId": 
DATASET, "tableId": TABLE},
-                "labels": None,
-                "description": None,
+                "labels": {},
                 "externalDataConfiguration": {
-                    "source_uris": 
[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"],
-                    "source_format": "CSV",
-                    "maxBadRecords": 0,
                     "autodetect": True,
+                    "sourceFormat": "CSV",
+                    "sourceUris": 
[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"],
                     "compression": "NONE",
+                    "ignoreUnknownValues": False,
                     "csvOptions": {
-                        "fieldDelimeter": ",",
                         "skipLeadingRows": None,
+                        "fieldDelimiter": ",",
                         "quote": None,
                         "allowQuotedNewlines": False,
                         "allowJaggedRows": False,
+                        "encoding": "UTF-8",
                     },
+                    "schema": {"fields": SCHEMA_FIELDS},
                 },
-                "location": None,
-                "encryptionConfiguration": None,
-                "schema": {"fields": SCHEMA_FIELDS},
-            }
+            },
         )
         gcs_hook.return_value.download.assert_called_once_with(SCHEMA_BUCKET, 
SCHEMA_OBJECT)
 
-    
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSHook")
-    
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
+    @mock.patch(GCS_TO_BQ_PATH.format("GCSHook"))
+    @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
     def 
test_schema_obj_without_external_table_should_execute_successfully(self, 
bq_hook, gcs_hook):
         bq_hook.return_value.insert_job.side_effect = [
             MagicMock(job_id=pytest.real_job_id, error_result=False),
@@ -754,28 +731,21 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
         calls = [
             call(
                 configuration={
-                    "load": dict(
-                        autodetect=True,
-                        createDisposition="CREATE_IF_NEEDED",
-                        destinationTable={"projectId": PROJECT_ID, 
"datasetId": DATASET, "tableId": TABLE},
-                        destinationTableProperties={
-                            "description": None,
-                            "labels": None,
-                        },
-                        sourceFormat="CSV",
-                        skipLeadingRows=None,
-                        
sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"],
-                        writeDisposition=WRITE_DISPOSITION,
-                        ignoreUnknownValues=False,
-                        allowQuotedNewlines=False,
-                        encoding="UTF-8",
-                        schema={"fields": SCHEMA_FIELDS},
-                        allowJaggedRows=False,
-                        fieldDelimiter=",",
-                        maxBadRecords=0,
-                        quote=None,
-                        schemaUpdateOptions=(),
-                    ),
+                    "load": {
+                        "autodetect": True,
+                        "createDisposition": "CREATE_IF_NEEDED",
+                        "destinationTable": {"projectId": PROJECT_ID, 
"datasetId": DATASET, "tableId": TABLE},
+                        "sourceFormat": "CSV",
+                        "sourceUris": 
[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"],
+                        "writeDisposition": "WRITE_TRUNCATE",
+                        "ignoreUnknownValues": False,
+                        "schema": {"fields": SCHEMA_FIELDS},
+                        "skipLeadingRows": None,
+                        "fieldDelimiter": ",",
+                        "quote": None,
+                        "allowQuotedNewlines": False,
+                        "encoding": "UTF-8",
+                    }
                 },
                 project_id=bq_hook.return_value.project_id,
                 location=None,
@@ -783,266 +753,113 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
                 timeout=None,
                 retry=DEFAULT_RETRY,
                 nowait=True,
-            ),
+            )
         ]
 
         bq_hook.return_value.insert_job.assert_has_calls(calls)
         gcs_hook.return_value.download.assert_called_once_with(SCHEMA_BUCKET, 
SCHEMA_OBJECT)
 
-    
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
-    def test_all_fields_should_be_present(self, hook):
+    @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+    def test_execute_should_throw_ex_when_no_bucket_specified(self, hook):
         hook.return_value.insert_job.side_effect = [
             MagicMock(job_id=pytest.real_job_id, error_result=False),
             pytest.real_job_id,
         ]
         hook.return_value.generate_job_id.return_value = pytest.real_job_id
         hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, 
TABLE)
-        operator = GCSToBigQueryOperator(
-            task_id=TASK_ID,
-            bucket=TEST_BUCKET,
-            source_objects=TEST_SOURCE_OBJECTS,
-            schema_fields=SCHEMA_FIELDS,
-            destination_project_dataset_table=TEST_EXPLICIT_DEST,
-            write_disposition=WRITE_DISPOSITION,
-            external_table=False,
-            field_delimiter=";",
-            max_bad_records=13,
-            quote_character="|",
-            schema_update_options={"foo": "bar"},
-            allow_jagged_rows=True,
-            encryption_configuration={"bar": "baz"},
-            cluster_fields=["field_1", "field_2"],
-        )
-
-        operator.execute(context=MagicMock())
+        with pytest.raises(AirflowException, match=r"missing keyword argument 
'bucket'"):
+            operator = GCSToBigQueryOperator(
+                task_id=TASK_ID,
+                source_objects=TEST_SOURCE_OBJECTS,
+                destination_project_dataset_table=TEST_EXPLICIT_DEST,
+                schema_fields=SCHEMA_FIELDS,
+                max_id_key=MAX_ID_KEY,
+                write_disposition=WRITE_DISPOSITION,
+                external_table=False,
+            )
+            operator.execute(context=MagicMock())
 
-        calls = [
-            call(
-                configuration={
-                    "load": dict(
-                        autodetect=True,
-                        createDisposition="CREATE_IF_NEEDED",
-                        destinationTable={"projectId": PROJECT_ID, 
"datasetId": DATASET, "tableId": TABLE},
-                        destinationTableProperties={
-                            "description": None,
-                            "labels": None,
-                        },
-                        sourceFormat="CSV",
-                        skipLeadingRows=None,
-                        
sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"],
-                        writeDisposition=WRITE_DISPOSITION,
-                        ignoreUnknownValues=False,
-                        allowQuotedNewlines=False,
-                        encoding="UTF-8",
-                        schema={"fields": SCHEMA_FIELDS},
-                        allowJaggedRows=True,
-                        fieldDelimiter=";",
-                        maxBadRecords=13,
-                        quote="|",
-                        schemaUpdateOptions={"foo": "bar"},
-                        destinationEncryptionConfiguration={"bar": "baz"},
-                        clustering={"fields": ["field_1", "field_2"]},
-                    ),
-                },
-                project_id=hook.return_value.project_id,
-                location=None,
-                job_id=pytest.real_job_id,
-                timeout=None,
-                retry=DEFAULT_RETRY,
-                nowait=True,
-            ),
+    @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+    def test_execute_should_throw_ex_when_no_source_objects_specified(self, 
hook):
+        hook.return_value.insert_job.side_effect = [
+            MagicMock(job_id=pytest.real_job_id, error_result=False),
+            pytest.real_job_id,
         ]
+        hook.return_value.generate_job_id.return_value = pytest.real_job_id
+        hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, 
TABLE)
+        with pytest.raises(AirflowException, match=r"missing keyword argument 
'source_objects'"):
+            operator = GCSToBigQueryOperator(
+                task_id=TASK_ID,
+                destination_project_dataset_table=TEST_EXPLICIT_DEST,
+                schema_fields=SCHEMA_FIELDS,
+                bucket=TEST_BUCKET,
+                max_id_key=MAX_ID_KEY,
+                write_disposition=WRITE_DISPOSITION,
+                external_table=False,
+            )
+            operator.execute(context=MagicMock())
 
-        hook.return_value.insert_job.assert_has_calls(calls)
-
-    
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
-    def test_date_partitioned_explicit_setting_should_be_found(self, hook):
+    @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+    def 
test_execute_should_throw_ex_when_no_destination_project_dataset_table_specified(self,
 hook):
         hook.return_value.insert_job.side_effect = [
             MagicMock(job_id=pytest.real_job_id, error_result=False),
             pytest.real_job_id,
         ]
         hook.return_value.generate_job_id.return_value = pytest.real_job_id
         hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, 
TABLE)
-        operator = GCSToBigQueryOperator(
-            task_id=TASK_ID,
-            bucket=TEST_BUCKET,
-            source_objects=TEST_SOURCE_OBJECTS,
-            schema_fields=SCHEMA_FIELDS,
-            destination_project_dataset_table=TEST_EXPLICIT_DEST,
-            write_disposition=WRITE_DISPOSITION,
-            external_table=False,
-            time_partitioning={"type": "DAY"},
-        )
-
-        operator.execute(context=MagicMock())
-
-        calls = [
-            call(
-                configuration={
-                    "load": dict(
-                        autodetect=True,
-                        createDisposition="CREATE_IF_NEEDED",
-                        destinationTable={"projectId": PROJECT_ID, 
"datasetId": DATASET, "tableId": TABLE},
-                        destinationTableProperties={
-                            "description": None,
-                            "labels": None,
-                        },
-                        sourceFormat="CSV",
-                        skipLeadingRows=None,
-                        
sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"],
-                        writeDisposition=WRITE_DISPOSITION,
-                        ignoreUnknownValues=False,
-                        allowQuotedNewlines=False,
-                        encoding="UTF-8",
-                        schema={"fields": SCHEMA_FIELDS},
-                        allowJaggedRows=False,
-                        fieldDelimiter=",",
-                        maxBadRecords=0,
-                        quote=None,
-                        schemaUpdateOptions=(),
-                        timePartitioning={"type": "DAY"},
-                    ),
-                },
-                project_id=hook.return_value.project_id,
-                location=None,
-                job_id=pytest.real_job_id,
-                timeout=None,
-                retry=DEFAULT_RETRY,
-                nowait=True,
-            ),
-        ]
-
-        hook.return_value.insert_job.assert_has_calls(calls)
-
-    
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
-    def test_date_partitioned_implied_in_table_name_should_be_found(self, 
hook):
-        hook.return_value.insert_job.side_effect = [
-            MagicMock(job_id=pytest.real_job_id, error_result=False),
-            pytest.real_job_id,
-        ]
-        hook.return_value.generate_job_id.return_value = pytest.real_job_id
-        hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, 
TABLE)
-        operator = GCSToBigQueryOperator(
-            task_id=TASK_ID,
-            bucket=TEST_BUCKET,
-            source_objects=TEST_SOURCE_OBJECTS,
-            schema_fields=SCHEMA_FIELDS,
-            destination_project_dataset_table=TEST_EXPLICIT_DEST + "$20221123",
-            write_disposition=WRITE_DISPOSITION,
-            external_table=False,
-        )
-
-        operator.execute(context=MagicMock())
-
-        calls = [
-            call(
-                configuration={
-                    "load": dict(
-                        autodetect=True,
-                        createDisposition="CREATE_IF_NEEDED",
-                        destinationTable={"projectId": PROJECT_ID, 
"datasetId": DATASET, "tableId": TABLE},
-                        destinationTableProperties={
-                            "description": None,
-                            "labels": None,
-                        },
-                        sourceFormat="CSV",
-                        skipLeadingRows=None,
-                        
sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"],
-                        writeDisposition=WRITE_DISPOSITION,
-                        ignoreUnknownValues=False,
-                        allowQuotedNewlines=False,
-                        encoding="UTF-8",
-                        schema={"fields": SCHEMA_FIELDS},
-                        allowJaggedRows=False,
-                        fieldDelimiter=",",
-                        maxBadRecords=0,
-                        quote=None,
-                        schemaUpdateOptions=(),
-                        timePartitioning={"type": "DAY"},
-                    ),
-                },
-                project_id=hook.return_value.project_id,
-                location=None,
-                job_id=pytest.real_job_id,
-                timeout=None,
-                retry=DEFAULT_RETRY,
-                nowait=True,
-            ),
-        ]
-
-        hook.return_value.insert_job.assert_has_calls(calls)
-
-    
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
-    def test_execute_should_throw_ex_when_no_bucket_specified(self, hook):
-        hook.return_value.insert_job.side_effect = [
-            MagicMock(job_id=pytest.real_job_id, error_result=False),
-            pytest.real_job_id,
-        ]
-        hook.return_value.generate_job_id.return_value = pytest.real_job_id
-        hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, 
TABLE)
-        with pytest.raises(AirflowException, match=r"missing keyword argument 
'bucket'"):
-            operator = GCSToBigQueryOperator(
-                task_id=TASK_ID,
-                source_objects=TEST_SOURCE_OBJECTS,
-                destination_project_dataset_table=TEST_EXPLICIT_DEST,
-                schema_fields=SCHEMA_FIELDS,
-                max_id_key=MAX_ID_KEY,
-                write_disposition=WRITE_DISPOSITION,
-                external_table=False,
-            )
-            operator.execute(context=MagicMock())
-
-    
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
-    def test_execute_should_throw_ex_when_no_source_objects_specified(self, 
hook):
-
-        hook.return_value.insert_job.side_effect = [
-            MagicMock(job_id=pytest.real_job_id, error_result=False),
-            pytest.real_job_id,
-        ]
-        hook.return_value.generate_job_id.return_value = pytest.real_job_id
-        hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, 
TABLE)
-        with pytest.raises(AirflowException, match=r"missing keyword argument 
'source_objects'"):
+        with pytest.raises(
+            AirflowException, match=r"missing keyword argument 
'destination_project_dataset_table'"
+        ):
             operator = GCSToBigQueryOperator(
                 task_id=TASK_ID,
-                destination_project_dataset_table=TEST_EXPLICIT_DEST,
                 schema_fields=SCHEMA_FIELDS,
                 bucket=TEST_BUCKET,
+                source_objects=TEST_SOURCE_OBJECTS,
                 max_id_key=MAX_ID_KEY,
                 write_disposition=WRITE_DISPOSITION,
                 external_table=False,
             )
             operator.execute(context=MagicMock())
 
-    
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
-    def 
test_execute_should_throw_ex_when_no_destination_project_dataset_table_specified(self,
 hook):
+    @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+    def test_source_format_check_should_throw_ex_when_incorrect_source_type(
+        self,
+        hook,
+    ):
         hook.return_value.insert_job.side_effect = [
             MagicMock(job_id=pytest.real_job_id, error_result=False),
             pytest.real_job_id,
         ]
         hook.return_value.generate_job_id.return_value = pytest.real_job_id
         hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, 
TABLE)
+        hook.return_value.get_job.return_value.result.return_value = ("1",)
+
         with pytest.raises(
-            AirflowException, match=r"missing keyword argument 
'destination_project_dataset_table'"
+            ValueError,
+            match=r"is not a valid source format.",
         ):
             operator = GCSToBigQueryOperator(
                 task_id=TASK_ID,
-                schema_fields=SCHEMA_FIELDS,
                 bucket=TEST_BUCKET,
                 source_objects=TEST_SOURCE_OBJECTS,
+                destination_project_dataset_table=TEST_EXPLICIT_DEST,
                 max_id_key=MAX_ID_KEY,
                 write_disposition=WRITE_DISPOSITION,
                 external_table=False,
+                autodetect=False,
+                source_format="incorrect",
             )
             operator.execute(context=MagicMock())
 
-    
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSHook")
-    
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
-    def 
test_schema_fields_scanner_external_table_should_execute_successfully(self, 
bq_hook, gcs_hook):
+    @mock.patch(GCS_TO_BQ_PATH.format("GCSHook"))
+    @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+    def 
test_schema_fields_integer_scanner_external_table_should_execute_successfully(
+        self, bq_hook, gcs_hook
+    ):
         """
         Check detection of schema fields if schema_fields parameter is not
-        specified and fields are read from source objects correctly by the 
operator
-        if all fields are characters. In this case operator searches for 
fields in source object
-        and update configuration with constructed schema_fields.
+        specified and fields are read from source objects correctly by 
BigQuery if at least
+        one field includes non-string value.
         """
         bq_hook.return_value.insert_job.side_effect = [
             MagicMock(job_id=pytest.real_job_id, error_result=False),
@@ -1051,16 +868,15 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
         bq_hook.return_value.generate_job_id.return_value = pytest.real_job_id
         bq_hook.return_value.split_tablename.return_value = (PROJECT_ID, 
DATASET, TABLE)
         bq_hook.return_value.get_job.return_value.result.return_value = ("1",)
-
-        gcs_hook.return_value.download.return_value = b"id,name\r\none,Anna"
+        gcs_hook.return_value.download.return_value = b"id,name\r\n1,Anna"
 
         operator = GCSToBigQueryOperator(
             task_id=TASK_ID,
             bucket=TEST_BUCKET,
             source_objects=TEST_SOURCE_OBJECTS,
             destination_project_dataset_table=TEST_EXPLICIT_DEST,
-            max_id_key=MAX_ID_KEY,
             write_disposition=WRITE_DISPOSITION,
+            max_id_key=MAX_ID_KEY,
             external_table=True,
             autodetect=True,
         )
@@ -1069,28 +885,28 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
 
         assert result == "1"
         bq_hook.return_value.create_empty_table.assert_called_once_with(
+            exists_ok=True,
+            location=None,
+            project_id=PROJECT_ID,
             table_resource={
                 "tableReference": {"projectId": PROJECT_ID, "datasetId": 
DATASET, "tableId": TABLE},
-                "labels": None,
-                "description": None,
+                "labels": {},
                 "externalDataConfiguration": {
-                    "source_uris": 
[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"],
-                    "source_format": "CSV",
-                    "maxBadRecords": 0,
                     "autodetect": True,
+                    "sourceFormat": "CSV",
+                    "sourceUris": 
[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"],
                     "compression": "NONE",
+                    "ignoreUnknownValues": False,
                     "csvOptions": {
-                        "fieldDelimeter": ",",
-                        "skipLeadingRows": 1,
+                        "skipLeadingRows": None,
+                        "fieldDelimiter": ",",
                         "quote": None,
                         "allowQuotedNewlines": False,
                         "allowJaggedRows": False,
+                        "encoding": "UTF-8",
                     },
                 },
-                "location": None,
-                "encryptionConfiguration": None,
-                "schema": {"fields": SCHEMA_FIELDS},
-            }
+            },
         )
         bq_hook.return_value.insert_job.assert_called_once_with(
             configuration={
@@ -1103,16 +919,15 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
             project_id=bq_hook.return_value.project_id,
         )
 
-    
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSHook")
-    
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
-    def 
test_schema_fields_scanner_without_external_table_should_execute_successfully(
+    @mock.patch(GCS_TO_BQ_PATH.format("GCSHook"))
+    @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+    def 
test_schema_fields_integer_scanner_without_external_table_should_execute_successfully(
         self, bq_hook, gcs_hook
     ):
         """
         Check detection of schema fields if schema_fields parameter is not
-        specified and fields are read from source objects correctly by the 
operator
-        if all fields are characters. In this case operator searches for 
fields in source object
-        and update configuration with constructed schema_fields.
+        specified and fields are read from source objects correctly by 
BigQuery if at least
+        one field includes non-string value.
         """
         bq_hook.return_value.insert_job.side_effect = [
             MagicMock(job_id=pytest.real_job_id, error_result=False),
@@ -1121,8 +936,7 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
         bq_hook.return_value.generate_job_id.return_value = pytest.real_job_id
         bq_hook.return_value.split_tablename.return_value = (PROJECT_ID, 
DATASET, TABLE)
         bq_hook.return_value.get_job.return_value.result.return_value = ("1",)
-
-        gcs_hook.return_value.download.return_value = b"id,name\r\none,Anna"
+        gcs_hook.return_value.download.return_value = b"id,name\r\n1,Anna"
 
         operator = GCSToBigQueryOperator(
             task_id=TASK_ID,
@@ -1141,35 +955,27 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
         calls = [
             call(
                 configuration={
-                    "load": dict(
-                        autodetect=True,
-                        createDisposition="CREATE_IF_NEEDED",
-                        destinationTable={"projectId": PROJECT_ID, 
"datasetId": DATASET, "tableId": TABLE},
-                        destinationTableProperties={
-                            "description": None,
-                            "labels": None,
-                        },
-                        sourceFormat="CSV",
-                        skipLeadingRows=1,
-                        
sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"],
-                        writeDisposition=WRITE_DISPOSITION,
-                        ignoreUnknownValues=False,
-                        allowQuotedNewlines=False,
-                        encoding="UTF-8",
-                        schema={"fields": SCHEMA_FIELDS},
-                        allowJaggedRows=False,
-                        fieldDelimiter=",",
-                        maxBadRecords=0,
-                        quote=None,
-                        schemaUpdateOptions=(),
-                    ),
+                    "load": {
+                        "autodetect": True,
+                        "createDisposition": "CREATE_IF_NEEDED",
+                        "destinationTable": {"projectId": PROJECT_ID, 
"datasetId": DATASET, "tableId": TABLE},
+                        "sourceFormat": "CSV",
+                        "sourceUris": 
[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"],
+                        "writeDisposition": WRITE_DISPOSITION,
+                        "ignoreUnknownValues": False,
+                        "skipLeadingRows": None,
+                        "fieldDelimiter": ",",
+                        "quote": None,
+                        "allowQuotedNewlines": False,
+                        "encoding": "UTF-8",
+                    }
                 },
-                project_id=bq_hook.return_value.project_id,
-                location=None,
                 job_id=pytest.real_job_id,
-                timeout=None,
-                retry=DEFAULT_RETRY,
+                location=None,
                 nowait=True,
+                project_id=bq_hook.return_value.project_id,
+                retry=DEFAULT_RETRY,
+                timeout=None,
             ),
             call(
                 configuration={
@@ -1185,11 +991,8 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
 
         bq_hook.return_value.insert_job.assert_has_calls(calls)
 
-    
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
-    def 
test_schema_fields_scanner_external_table_should_throw_ex_when_autodetect_not_specified(
-        self,
-        hook,
-    ):
+    @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+    def 
test_schema_fields_without_external_table_should_execute_successfully(self, 
hook):
         hook.return_value.insert_job.side_effect = [
             MagicMock(job_id=pytest.real_job_id, error_result=False),
             pytest.real_job_id,
@@ -1198,24 +1001,50 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
         hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, 
TABLE)
         hook.return_value.get_job.return_value.result.return_value = ("1",)
 
-        with pytest.raises(RuntimeError, match=r"Table schema was not found."):
-            operator = GCSToBigQueryOperator(
-                task_id=TASK_ID,
-                bucket=TEST_BUCKET,
-                source_objects=TEST_SOURCE_OBJECTS,
-                destination_project_dataset_table=TEST_EXPLICIT_DEST,
-                max_id_key=MAX_ID_KEY,
-                write_disposition=WRITE_DISPOSITION,
-                external_table=True,
-                autodetect=False,
+        operator = GCSToBigQueryOperator(
+            task_id=TASK_ID,
+            bucket=TEST_BUCKET,
+            source_objects=TEST_SOURCE_OBJECTS,
+            destination_project_dataset_table=TEST_EXPLICIT_DEST,
+            write_disposition=WRITE_DISPOSITION,
+            schema_fields=SCHEMA_FIELDS_INT,
+            external_table=False,
+            autodetect=True,
+        )
+
+        operator.execute(context=MagicMock())
+        calls = [
+            call(
+                configuration={
+                    "load": {
+                        "autodetect": True,
+                        "createDisposition": "CREATE_IF_NEEDED",
+                        "destinationTable": {"projectId": PROJECT_ID, 
"datasetId": DATASET, "tableId": TABLE},
+                        "sourceFormat": "CSV",
+                        "sourceUris": 
[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"],
+                        "writeDisposition": WRITE_DISPOSITION,
+                        "ignoreUnknownValues": False,
+                        "schema": {"fields": SCHEMA_FIELDS_INT},
+                        "skipLeadingRows": None,
+                        "fieldDelimiter": ",",
+                        "quote": None,
+                        "allowQuotedNewlines": False,
+                        "encoding": "UTF-8",
+                    }
+                },
+                job_id=pytest.real_job_id,
+                location=None,
+                nowait=True,
+                project_id=hook.return_value.project_id,
+                retry=DEFAULT_RETRY,
+                timeout=None,
             )
-            operator.execute(context=MagicMock())
+        ]
 
-    
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
-    def 
test_schema_fields_scanner_without_external_table_should_throw_ex_when_autodetect_not_specified(
-        self,
-        hook,
-    ):
+        hook.return_value.insert_job.assert_has_calls(calls)
+
+    @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+    def test_schema_fields_external_table_should_execute_successfully(self, 
hook):
         hook.return_value.insert_job.side_effect = [
             MagicMock(job_id=pytest.real_job_id, error_result=False),
             pytest.real_job_id,
@@ -1224,104 +1053,56 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
         hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, 
TABLE)
         hook.return_value.get_job.return_value.result.return_value = ("1",)
 
-        with pytest.raises(RuntimeError, match=r"Table schema was not found."):
-            operator = GCSToBigQueryOperator(
-                task_id=TASK_ID,
-                bucket=TEST_BUCKET,
-                source_objects=TEST_SOURCE_OBJECTS,
-                destination_project_dataset_table=TEST_EXPLICIT_DEST,
-                max_id_key=MAX_ID_KEY,
-                write_disposition=WRITE_DISPOSITION,
-                external_table=False,
-                autodetect=False,
-            )
-            operator.execute(context=MagicMock())
-
-    
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSHook")
-    
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
-    def 
test_schema_fields_integer_scanner_external_table_should_execute_successfully(
-        self, bq_hook, gcs_hook
-    ):
-        """
-        Check detection of schema fields if schema_fields parameter is not
-        specified and fields are read from source objects correctly by 
BigQuery if at least
-        one field includes non-string value.
-        """
-        bq_hook.return_value.insert_job.side_effect = [
-            MagicMock(job_id=pytest.real_job_id, error_result=False),
-            pytest.real_job_id,
-        ]
-        bq_hook.return_value.generate_job_id.return_value = pytest.real_job_id
-        bq_hook.return_value.split_tablename.return_value = (PROJECT_ID, 
DATASET, TABLE)
-        bq_hook.return_value.get_job.return_value.result.return_value = ("1",)
-        gcs_hook.return_value.download.return_value = b"id,name\r\n1,Anna"
-
         operator = GCSToBigQueryOperator(
             task_id=TASK_ID,
             bucket=TEST_BUCKET,
             source_objects=TEST_SOURCE_OBJECTS,
             destination_project_dataset_table=TEST_EXPLICIT_DEST,
             write_disposition=WRITE_DISPOSITION,
-            max_id_key=MAX_ID_KEY,
+            schema_fields=SCHEMA_FIELDS_INT,
             external_table=True,
             autodetect=True,
         )
 
-        result = operator.execute(context=MagicMock())
-
-        assert result == "1"
-        bq_hook.return_value.create_empty_table.assert_called_once_with(
+        operator.execute(context=MagicMock())
+        hook.return_value.create_empty_table.assert_called_once_with(
+            exists_ok=True,
+            location=None,
+            project_id=PROJECT_ID,
             table_resource={
                 "tableReference": {"projectId": PROJECT_ID, "datasetId": 
DATASET, "tableId": TABLE},
-                "labels": None,
-                "description": None,
+                "labels": {},
                 "externalDataConfiguration": {
-                    "source_uris": 
[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"],
-                    "source_format": "CSV",
-                    "maxBadRecords": 0,
                     "autodetect": True,
+                    "sourceFormat": "CSV",
+                    "sourceUris": 
[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"],
                     "compression": "NONE",
+                    "ignoreUnknownValues": False,
                     "csvOptions": {
-                        "fieldDelimeter": ",",
                         "skipLeadingRows": None,
+                        "fieldDelimiter": ",",
                         "quote": None,
                         "allowQuotedNewlines": False,
                         "allowJaggedRows": False,
+                        "encoding": "UTF-8",
                     },
+                    "schema": {"fields": SCHEMA_FIELDS_INT},
                 },
-                "location": None,
-                "encryptionConfiguration": None,
-            }
-        )
-        bq_hook.return_value.insert_job.assert_called_once_with(
-            configuration={
-                "query": {
-                    "query": f"SELECT MAX({MAX_ID_KEY}) AS max_value FROM 
{TEST_EXPLICIT_DEST}",
-                    "useLegacySql": False,
-                    "schemaUpdateOptions": [],
-                }
-            },
-            project_id=bq_hook.return_value.project_id,
+            },
         )
 
-    
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSHook")
-    
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
-    def 
test_schema_fields_integer_scanner_without_external_table_should_execute_successfully(
-        self, bq_hook, gcs_hook
-    ):
+
+class TestAsyncGCSToBigQueryOperator(unittest.TestCase):
+    @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+    def 
test_execute_without_external_table_async_should_execute_successfully(self, 
hook):
         """
-        Check detection of schema fields if schema_fields parameter is not
-        specified and fields are read from source objects correctly by 
BigQuery if at least
-        one field includes non-string value.
+        Asserts that a task is deferred and a BigQueryInsertJobTrigger will be 
fired
+        when Operator is executed in deferrable.
         """
-        bq_hook.return_value.insert_job.side_effect = [
-            MagicMock(job_id=pytest.real_job_id, error_result=False),
-            pytest.real_job_id,
-        ]
-        bq_hook.return_value.generate_job_id.return_value = pytest.real_job_id
-        bq_hook.return_value.split_tablename.return_value = (PROJECT_ID, 
DATASET, TABLE)
-        bq_hook.return_value.get_job.return_value.result.return_value = ("1",)
-        gcs_hook.return_value.download.return_value = b"id,name\r\n1,Anna"
+        hook.return_value.insert_job.return_value = 
MagicMock(job_id=pytest.real_job_id, error_result=False)
+        hook.return_value.generate_job_id.return_value = pytest.real_job_id
+        hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, 
TABLE)
+        hook.return_value.get_job.return_value.result.return_value = ("1",)
 
         operator = GCSToBigQueryOperator(
             task_id=TASK_ID,
@@ -1329,69 +1110,76 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
             source_objects=TEST_SOURCE_OBJECTS,
             destination_project_dataset_table=TEST_EXPLICIT_DEST,
             write_disposition=WRITE_DISPOSITION,
-            max_id_key=MAX_ID_KEY,
+            schema_fields=SCHEMA_FIELDS,
             external_table=False,
             autodetect=True,
+            deferrable=True,
         )
 
-        result = operator.execute(context=MagicMock())
+        with pytest.raises(TaskDeferred) as exc:
+            operator.execute(self.create_context(operator))
 
-        assert result == "1"
-        calls = [
-            call(
-                configuration={
-                    "load": dict(
-                        autodetect=True,
-                        createDisposition="CREATE_IF_NEEDED",
-                        destinationTable={"projectId": PROJECT_ID, 
"datasetId": DATASET, "tableId": TABLE},
-                        destinationTableProperties={
-                            "description": None,
-                            "labels": None,
-                        },
-                        sourceFormat="CSV",
-                        skipLeadingRows=None,
-                        
sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"],
-                        writeDisposition=WRITE_DISPOSITION,
-                        ignoreUnknownValues=False,
-                        allowQuotedNewlines=False,
-                        encoding="UTF-8",
-                        allowJaggedRows=False,
-                        fieldDelimiter=",",
-                        maxBadRecords=0,
-                        quote=None,
-                        schemaUpdateOptions=(),
-                    ),
-                },
-                project_id=bq_hook.return_value.project_id,
-                location=None,
-                job_id=pytest.real_job_id,
-                timeout=None,
-                retry=DEFAULT_RETRY,
-                nowait=True,
-            ),
-            call(
-                configuration={
-                    "query": {
-                        "query": f"SELECT MAX({MAX_ID_KEY}) AS max_value FROM 
{TEST_EXPLICIT_DEST}",
-                        "useLegacySql": False,
-                        "schemaUpdateOptions": [],
-                    }
-                },
-                project_id=bq_hook.return_value.project_id,
-            ),
-        ]
+        assert isinstance(
+            exc.value.trigger, BigQueryInsertJobTrigger
+        ), "Trigger is not a BigQueryInsertJobTrigger"
 
-        bq_hook.return_value.insert_job.assert_has_calls(calls)
+    def 
test_execute_without_external_table_async_should_throw_ex_when_event_status_error(self):
+        """
+        Tests that an AirflowException is raised in case of error event.
+        """
 
-    
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
-    def 
test_schema_fields_without_external_table_should_execute_successfully(self, 
hook):
-        hook.return_value.insert_job.side_effect = [
-            MagicMock(job_id=pytest.real_job_id, error_result=False),
-            pytest.real_job_id,
-        ]
-        hook.return_value.generate_job_id.return_value = pytest.real_job_id
+        with pytest.raises(AirflowException):
+            operator = GCSToBigQueryOperator(
+                task_id=TASK_ID,
+                bucket=TEST_BUCKET,
+                source_objects=TEST_SOURCE_OBJECTS,
+                destination_project_dataset_table=TEST_EXPLICIT_DEST,
+                write_disposition=WRITE_DISPOSITION,
+                schema_fields=SCHEMA_FIELDS,
+                external_table=False,
+                autodetect=True,
+                deferrable=True,
+            )
+            operator.execute_complete(
+                context=None, event={"status": "error", "message": "test 
failure message"}
+            )
+
+    def 
test_execute_logging_without_external_table_async_should_execute_successfully(self):
+        """
+        Asserts that logging occurs as expected.
+        """
+
+        operator = GCSToBigQueryOperator(
+            task_id=TASK_ID,
+            bucket=TEST_BUCKET,
+            source_objects=TEST_SOURCE_OBJECTS,
+            destination_project_dataset_table=TEST_EXPLICIT_DEST,
+            write_disposition=WRITE_DISPOSITION,
+            schema_fields=SCHEMA_FIELDS,
+            external_table=False,
+            autodetect=True,
+            deferrable=True,
+        )
+        with mock.patch.object(operator.log, "info") as mock_log_info:
+            operator.execute_complete(
+                context=self.create_context(operator),
+                event={"status": "success", "message": "Job completed", 
"job_id": job_id},
+            )
+        mock_log_info.assert_called_with(
+            "%s completed with response %s ", "test-gcs-to-bq-operator", "Job 
completed"
+        )
+
+    @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+    def 
test_execute_without_external_table_generate_job_id_async_should_execute_successfully(self,
 hook):
+        hook.return_value.insert_job.side_effect = Conflict("any")
         hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, 
TABLE)
-        hook.return_value.get_job.return_value.result.return_value = ("1",)
+        job = MagicMock(
+            job_id=pytest.real_job_id,
+            error_result=False,
+            state="PENDING",
+            done=lambda: False,
+        )
+        hook.return_value.get_job.return_value = job
 
         operator = GCSToBigQueryOperator(
             task_id=TASK_ID,
@@ -1399,58 +1187,38 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
             source_objects=TEST_SOURCE_OBJECTS,
             destination_project_dataset_table=TEST_EXPLICIT_DEST,
             write_disposition=WRITE_DISPOSITION,
-            schema_fields=SCHEMA_FIELDS_INT,
+            schema_fields=SCHEMA_FIELDS,
+            reattach_states={"PENDING"},
             external_table=False,
             autodetect=True,
+            deferrable=True,
         )
 
-        operator.execute(context=MagicMock())
-        calls = [
-            call(
-                configuration={
-                    "load": dict(
-                        autodetect=True,
-                        createDisposition="CREATE_IF_NEEDED",
-                        destinationTable={"projectId": PROJECT_ID, 
"datasetId": DATASET, "tableId": TABLE},
-                        destinationTableProperties={
-                            "description": None,
-                            "labels": None,
-                        },
-                        sourceFormat="CSV",
-                        skipLeadingRows=None,
-                        
sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"],
-                        writeDisposition=WRITE_DISPOSITION,
-                        ignoreUnknownValues=False,
-                        allowQuotedNewlines=False,
-                        encoding="UTF-8",
-                        schema={"fields": SCHEMA_FIELDS_INT},
-                        allowJaggedRows=False,
-                        fieldDelimiter=",",
-                        maxBadRecords=0,
-                        quote=None,
-                        schemaUpdateOptions=(),
-                    ),
-                },
-                project_id=hook.return_value.project_id,
-                location=None,
-                job_id=pytest.real_job_id,
-                timeout=None,
-                retry=DEFAULT_RETRY,
-                nowait=True,
-            ),
-        ]
+        with pytest.raises(TaskDeferred):
+            operator.execute(self.create_context(operator))
 
-        hook.return_value.insert_job.assert_has_calls(calls)
+        hook.return_value.generate_job_id.assert_called_once_with(
+            job_id=None,
+            dag_id="adhoc_airflow",
+            task_id=TASK_ID,
+            logical_date=datetime(2022, 1, 1, 0, 0),
+            configuration={},
+            force_rerun=True,
+        )
 
-    
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
-    def test_schema_fields_external_table_should_execute_successfully(self, 
hook):
-        hook.return_value.insert_job.side_effect = [
-            MagicMock(job_id=pytest.real_job_id, error_result=False),
-            pytest.real_job_id,
-        ]
+    @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+    def 
test_execute_without_external_table_reattach_async_should_execute_successfully(self,
 hook):
         hook.return_value.generate_job_id.return_value = pytest.real_job_id
+
+        hook.return_value.insert_job.side_effect = Conflict("any")
         hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, 
TABLE)
-        hook.return_value.get_job.return_value.result.return_value = ("1",)
+        job = MagicMock(
+            job_id=pytest.real_job_id,
+            error_result=False,
+            state="PENDING",
+            done=lambda: False,
+        )
+        hook.return_value.get_job.return_value = job
 
         operator = GCSToBigQueryOperator(
             task_id=TASK_ID,
@@ -1458,75 +1226,39 @@ class TestGCSToBigQueryOperator(unittest.TestCase):
             source_objects=TEST_SOURCE_OBJECTS,
             destination_project_dataset_table=TEST_EXPLICIT_DEST,
             write_disposition=WRITE_DISPOSITION,
-            schema_fields=SCHEMA_FIELDS_INT,
-            external_table=True,
+            schema_fields=SCHEMA_FIELDS,
+            location=TEST_DATASET_LOCATION,
+            reattach_states={"PENDING"},
+            external_table=False,
             autodetect=True,
+            deferrable=True,
         )
 
-        operator.execute(context=MagicMock())
-        hook.return_value.create_empty_table.assert_called_once_with(
-            table_resource={
-                "tableReference": {"projectId": PROJECT_ID, "datasetId": 
DATASET, "tableId": TABLE},
-                "labels": None,
-                "description": None,
-                "externalDataConfiguration": {
-                    "source_uris": 
[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"],
-                    "source_format": "CSV",
-                    "maxBadRecords": 0,
-                    "autodetect": True,
-                    "compression": "NONE",
-                    "csvOptions": {
-                        "fieldDelimeter": ",",
-                        "skipLeadingRows": None,
-                        "quote": None,
-                        "allowQuotedNewlines": False,
-                        "allowJaggedRows": False,
-                    },
-                },
-                "location": None,
-                "encryptionConfiguration": None,
-                "schema": {"fields": SCHEMA_FIELDS_INT},
-            }
+        with pytest.raises(TaskDeferred):
+            operator.execute(self.create_context(operator))
+
+        hook.return_value.get_job.assert_called_once_with(
+            location=TEST_DATASET_LOCATION,
+            job_id=pytest.real_job_id,
+            project_id=hook.return_value.project_id,
         )
 
+        job._begin.assert_called_once_with()
+
+    @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+    def 
test_execute_without_external_table_force_rerun_async_should_execute_successfully(self,
 hook):
+        hook.return_value.generate_job_id.return_value = f"{job_id}_{hash_}"
+        hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, 
TABLE)
+
+        hook.return_value.insert_job.side_effect = Conflict("any")
+        job = MagicMock(
+            job_id=pytest.real_job_id,
+            error_result=False,
+            state="DONE",
+            done=lambda: False,
+        )
+        hook.return_value.get_job.return_value = job
 
[email protected]("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
-def 
test_execute_without_external_table_async_should_execute_successfully(hook):
-    """
-    Asserts that a task is deferred and a BigQueryInsertJobTrigger will be 
fired
-    when Operator is executed in deferrable.
-    """
-    hook.return_value.insert_job.return_value = 
MagicMock(job_id=pytest.real_job_id, error_result=False)
-    hook.return_value.generate_job_id.return_value = pytest.real_job_id
-    hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, 
TABLE)
-    hook.return_value.get_job.return_value.result.return_value = ("1",)
-
-    operator = GCSToBigQueryOperator(
-        task_id=TASK_ID,
-        bucket=TEST_BUCKET,
-        source_objects=TEST_SOURCE_OBJECTS,
-        destination_project_dataset_table=TEST_EXPLICIT_DEST,
-        write_disposition=WRITE_DISPOSITION,
-        schema_fields=SCHEMA_FIELDS,
-        external_table=False,
-        autodetect=True,
-        deferrable=True,
-    )
-
-    with pytest.raises(TaskDeferred) as exc:
-        operator.execute(create_context(operator))
-
-    assert isinstance(
-        exc.value.trigger, BigQueryInsertJobTrigger
-    ), "Trigger is not a BigQueryInsertJobTrigger"
-
-
-def 
test_execute_without_external_table_async_should_throw_ex_when_event_status_error():
-    """
-    Tests that an AirflowException is raised in case of error event.
-    """
-
-    with pytest.raises(AirflowException):
         operator = GCSToBigQueryOperator(
             task_id=TASK_ID,
             bucket=TEST_BUCKET,
@@ -1534,317 +1266,196 @@ def 
test_execute_without_external_table_async_should_throw_ex_when_event_status_
             destination_project_dataset_table=TEST_EXPLICIT_DEST,
             write_disposition=WRITE_DISPOSITION,
             schema_fields=SCHEMA_FIELDS,
+            location=TEST_DATASET_LOCATION,
+            reattach_states={"PENDING"},
             external_table=False,
             autodetect=True,
             deferrable=True,
         )
-        operator.execute_complete(context=None, event={"status": "error", 
"message": "test failure message"})
-
-
-def 
test_execute_logging_without_external_table_async_should_execute_successfully():
-    """
-    Asserts that logging occurs as expected.
-    """
-
-    operator = GCSToBigQueryOperator(
-        task_id=TASK_ID,
-        bucket=TEST_BUCKET,
-        source_objects=TEST_SOURCE_OBJECTS,
-        destination_project_dataset_table=TEST_EXPLICIT_DEST,
-        write_disposition=WRITE_DISPOSITION,
-        schema_fields=SCHEMA_FIELDS,
-        external_table=False,
-        autodetect=True,
-        deferrable=True,
-    )
-    with mock.patch.object(operator.log, "info") as mock_log_info:
-        operator.execute_complete(
-            context=create_context(operator),
-            event={"status": "success", "message": "Job completed", "job_id": 
job_id},
+
+        with pytest.raises(AirflowException) as exc:
+            operator.execute(self.create_context(operator))
+
+        expected_exception_msg = (
+            f"Job with id: {pytest.real_job_id} already exists and is in 
{job.state} state. "
+            f"If you want to force rerun it consider setting 
`force_rerun=True`."
+            f"Or, if you want to reattach in this scenario add {job.state} to 
`reattach_states`"
         )
-    mock_log_info.assert_called_with(
-        "%s completed with response %s ", "test-gcs-to-bq-operator", "Job 
completed"
-    )
-
-
[email protected]("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
-def 
test_execute_without_external_table_generate_job_id_async_should_execute_successfully(hook):
-    hook.return_value.insert_job.side_effect = Conflict("any")
-    hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, 
TABLE)
-    job = MagicMock(
-        job_id=pytest.real_job_id,
-        error_result=False,
-        state="PENDING",
-        done=lambda: False,
-    )
-    hook.return_value.get_job.return_value = job
-
-    operator = GCSToBigQueryOperator(
-        task_id=TASK_ID,
-        bucket=TEST_BUCKET,
-        source_objects=TEST_SOURCE_OBJECTS,
-        destination_project_dataset_table=TEST_EXPLICIT_DEST,
-        write_disposition=WRITE_DISPOSITION,
-        schema_fields=SCHEMA_FIELDS,
-        reattach_states={"PENDING"},
-        external_table=False,
-        autodetect=True,
-        deferrable=True,
-    )
-
-    with pytest.raises(TaskDeferred):
-        operator.execute(create_context(operator))
-
-    hook.return_value.generate_job_id.assert_called_once_with(
-        job_id=None,
-        dag_id="adhoc_airflow",
-        task_id=TASK_ID,
-        logical_date=datetime(2022, 1, 1, 0, 0),
-        configuration={},
-        force_rerun=True,
-    )
-
-
[email protected]("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
-def 
test_execute_without_external_table_reattach_async_should_execute_successfully(hook):
-    hook.return_value.generate_job_id.return_value = pytest.real_job_id
-
-    hook.return_value.insert_job.side_effect = Conflict("any")
-    hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, 
TABLE)
-    job = MagicMock(
-        job_id=pytest.real_job_id,
-        error_result=False,
-        state="PENDING",
-        done=lambda: False,
-    )
-    hook.return_value.get_job.return_value = job
-
-    operator = GCSToBigQueryOperator(
-        task_id=TASK_ID,
-        bucket=TEST_BUCKET,
-        source_objects=TEST_SOURCE_OBJECTS,
-        destination_project_dataset_table=TEST_EXPLICIT_DEST,
-        write_disposition=WRITE_DISPOSITION,
-        schema_fields=SCHEMA_FIELDS,
-        location=TEST_DATASET_LOCATION,
-        reattach_states={"PENDING"},
-        external_table=False,
-        autodetect=True,
-        deferrable=True,
-    )
-
-    with pytest.raises(TaskDeferred):
-        operator.execute(create_context(operator))
-
-    hook.return_value.get_job.assert_called_once_with(
-        location=TEST_DATASET_LOCATION,
-        job_id=pytest.real_job_id,
-        project_id=hook.return_value.project_id,
-    )
-
-    job._begin.assert_called_once_with()
-
-
[email protected]("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
-def 
test_execute_without_external_table_force_rerun_async_should_execute_successfully(hook):
-    hook.return_value.generate_job_id.return_value = f"{job_id}_{hash_}"
-    hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, 
TABLE)
-
-    hook.return_value.insert_job.side_effect = Conflict("any")
-    job = MagicMock(
-        job_id=pytest.real_job_id,
-        error_result=False,
-        state="DONE",
-        done=lambda: False,
-    )
-    hook.return_value.get_job.return_value = job
-
-    operator = GCSToBigQueryOperator(
-        task_id=TASK_ID,
-        bucket=TEST_BUCKET,
-        source_objects=TEST_SOURCE_OBJECTS,
-        destination_project_dataset_table=TEST_EXPLICIT_DEST,
-        write_disposition=WRITE_DISPOSITION,
-        schema_fields=SCHEMA_FIELDS,
-        location=TEST_DATASET_LOCATION,
-        reattach_states={"PENDING"},
-        external_table=False,
-        autodetect=True,
-        deferrable=True,
-    )
-
-    with pytest.raises(AirflowException) as exc:
-        operator.execute(create_context(operator))
-
-    expected_exception_msg = (
-        f"Job with id: {pytest.real_job_id} already exists and is in 
{job.state} state. "
-        f"If you want to force rerun it consider setting `force_rerun=True`."
-        f"Or, if you want to reattach in this scenario add {job.state} to 
`reattach_states`"
-    )
-
-    assert str(exc.value) == expected_exception_msg
-
-    hook.return_value.get_job.assert_called_once_with(
-        location=TEST_DATASET_LOCATION,
-        job_id=pytest.real_job_id,
-        project_id=hook.return_value.project_id,
-    )
-
-
[email protected]("airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSHook")
[email protected]("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
-def 
test_schema_fields_without_external_table_async_should_execute_successfully(bq_hook,
 gcs_hook):
-    bq_hook.return_value.insert_job.return_value = 
MagicMock(job_id=pytest.real_job_id, error_result=False)
-    bq_hook.return_value.generate_job_id.return_value = pytest.real_job_id
-    bq_hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, 
TABLE)
-    bq_hook.return_value.get_job.return_value.result.return_value = ("1",)
-    gcs_hook.return_value.download.return_value = b"id,name\r\none,Anna"
-
-    operator = GCSToBigQueryOperator(
-        task_id=TASK_ID,
-        bucket=TEST_BUCKET,
-        source_objects=TEST_SOURCE_OBJECTS,
-        destination_project_dataset_table=TEST_EXPLICIT_DEST,
-        write_disposition=WRITE_DISPOSITION,
-        schema_fields=SCHEMA_FIELDS,
-        max_id_key=MAX_ID_KEY,
-        external_table=False,
-        autodetect=True,
-        deferrable=True,
-    )
-
-    with pytest.raises(TaskDeferred):
-        result = operator.execute(create_context(operator))
-        assert result == "1"
 
-        calls = [
-            call(
-                configuration={
-                    "load": dict(
-                        autodetect=True,
-                        createDisposition="CREATE_IF_NEEDED",
-                        destinationTable={"projectId": PROJECT_ID, 
"datasetId": DATASET, "tableId": TABLE},
-                        destinationTableProperties={
-                            "description": None,
-                            "labels": None,
-                        },
-                        sourceFormat="CSV",
-                        skipLeadingRows=None,
-                        
sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"],
-                        writeDisposition=WRITE_DISPOSITION,
-                        ignoreUnknownValues=False,
-                        allowQuotedNewlines=False,
-                        encoding="UTF-8",
-                        schema={"fields": SCHEMA_FIELDS},
-                    ),
-                },
-                project_id=bq_hook.return_value.project_id,
-                location=None,
-                job_id=pytest.real_job_id,
-                timeout=None,
-                retry=DEFAULT_RETRY,
-                nowait=True,
-            ),
-            call(
-                configuration={
-                    "query": {
-                        "query": f"SELECT MAX({MAX_ID_KEY}) AS max_value FROM 
{TEST_EXPLICIT_DEST}",
-                        "useLegacySql": False,
-                        "schemaUpdateOptions": [],
-                    }
-                },
-                project_id=bq_hook.return_value.project_id,
-            ),
-        ]
+        assert str(exc.value) == expected_exception_msg
 
-        bq_hook.return_value.insert_job.assert_has_calls(calls)
+        hook.return_value.get_job.assert_called_once_with(
+            location=TEST_DATASET_LOCATION,
+            job_id=pytest.real_job_id,
+            project_id=hook.return_value.project_id,
+        )
+
+    @mock.patch(GCS_TO_BQ_PATH.format("GCSHook"))
+    @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+    def 
test_schema_fields_without_external_table_async_should_execute_successfully(self,
 bq_hook, gcs_hook):
+        bq_hook.return_value.insert_job.return_value = MagicMock(
+            job_id=pytest.real_job_id, error_result=False
+        )
+        bq_hook.return_value.generate_job_id.return_value = pytest.real_job_id
+        bq_hook.return_value.split_tablename.return_value = (PROJECT_ID, 
DATASET, TABLE)
+        bq_hook.return_value.get_job.return_value.result.return_value = ("1",)
+        gcs_hook.return_value.download.return_value = b"id,name\r\none,Anna"
 
+        operator = GCSToBigQueryOperator(
+            task_id=TASK_ID,
+            bucket=TEST_BUCKET,
+            source_objects=TEST_SOURCE_OBJECTS,
+            destination_project_dataset_table=TEST_EXPLICIT_DEST,
+            write_disposition=WRITE_DISPOSITION,
+            schema_fields=SCHEMA_FIELDS,
+            max_id_key=MAX_ID_KEY,
+            external_table=False,
+            autodetect=True,
+            deferrable=True,
+        )
 
[email protected]("airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSHook")
[email protected]("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook")
-def 
test_schema_fields_int_without_external_table_async_should_execute_successfully(bq_hook,
 gcs_hook):
-    bq_hook.return_value.insert_job.return_value = 
MagicMock(job_id=pytest.real_job_id, error_result=False)
-    bq_hook.return_value.generate_job_id.return_value = pytest.real_job_id
-    bq_hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, 
TABLE)
-    bq_hook.return_value.get_job.return_value.result.return_value = ("1",)
-    gcs_hook.return_value.download.return_value = b"id,name\r\n1,Anna"
-
-    operator = GCSToBigQueryOperator(
-        task_id=TASK_ID,
-        bucket=TEST_BUCKET,
-        source_objects=TEST_SOURCE_OBJECTS,
-        destination_project_dataset_table=TEST_EXPLICIT_DEST,
-        write_disposition=WRITE_DISPOSITION,
-        schema_fields=SCHEMA_FIELDS,
-        max_id_key=MAX_ID_KEY,
-        external_table=False,
-        autodetect=True,
-        deferrable=True,
-    )
-
-    with pytest.raises(TaskDeferred):
-        result = operator.execute(create_context(operator))
-        assert result == "1"
+        with pytest.raises(TaskDeferred):
+            result = operator.execute(self.create_context(operator))
+            assert result == "1"
+
+            calls = [
+                call(
+                    configuration={
+                        "load": dict(
+                            autodetect=True,
+                            createDisposition="CREATE_IF_NEEDED",
+                            destinationTable={
+                                "projectId": PROJECT_ID,
+                                "datasetId": DATASET,
+                                "tableId": TABLE,
+                            },
+                            destinationTableProperties={
+                                "description": None,
+                                "labels": None,
+                            },
+                            sourceFormat="CSV",
+                            skipLeadingRows=None,
+                            
sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"],
+                            writeDisposition=WRITE_DISPOSITION,
+                            ignoreUnknownValues=False,
+                            allowQuotedNewlines=False,
+                            encoding="UTF-8",
+                            schema={"fields": SCHEMA_FIELDS},
+                        ),
+                    },
+                    project_id=bq_hook.return_value.project_id,
+                    location=None,
+                    job_id=pytest.real_job_id,
+                    timeout=None,
+                    retry=DEFAULT_RETRY,
+                    nowait=True,
+                ),
+                call(
+                    configuration={
+                        "query": {
+                            "query": f"SELECT MAX({MAX_ID_KEY}) AS max_value 
FROM {TEST_EXPLICIT_DEST}",
+                            "useLegacySql": False,
+                            "schemaUpdateOptions": [],
+                        }
+                    },
+                    project_id=bq_hook.return_value.project_id,
+                ),
+            ]
 
-        calls = [
-            call(
-                configuration={
-                    "load": dict(
-                        autodetect=True,
-                        createDisposition="CREATE_IF_NEEDED",
-                        destinationTable={"projectId": PROJECT_ID, 
"datasetId": DATASET, "tableId": TABLE},
-                        destinationTableProperties={
-                            "description": None,
-                            "labels": None,
-                        },
-                        sourceFormat="CSV",
-                        skipLeadingRows=None,
-                        
sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"],
-                        writeDisposition=WRITE_DISPOSITION,
-                        ignoreUnknownValues=False,
-                        allowQuotedNewlines=False,
-                        encoding="UTF-8",
-                    ),
-                },
-                project_id=bq_hook.return_value.project_id,
-                location=None,
-                job_id=pytest.real_job_id,
-                timeout=None,
-                retry=DEFAULT_RETRY,
-                nowait=True,
-            ),
-            call(
-                configuration={
-                    "query": {
-                        "query": f"SELECT MAX({MAX_ID_KEY}) AS max_value FROM 
{TEST_EXPLICIT_DEST}",
-                        "useLegacySql": False,
-                        "schemaUpdateOptions": [],
-                    }
-                },
-                project_id=bq_hook.return_value.project_id,
-            ),
-        ]
+            bq_hook.return_value.insert_job.assert_has_calls(calls)
 
-        bq_hook.return_value.insert_job.assert_has_calls(calls)
+    @mock.patch(GCS_TO_BQ_PATH.format("GCSHook"))
+    @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
+    def 
test_schema_fields_int_without_external_table_async_should_execute_successfully(
+        self, bq_hook, gcs_hook
+    ):
+        bq_hook.return_value.insert_job.return_value = MagicMock(
+            job_id=pytest.real_job_id, error_result=False
+        )
+        bq_hook.return_value.generate_job_id.return_value = pytest.real_job_id
+        bq_hook.return_value.split_tablename.return_value = (PROJECT_ID, 
DATASET, TABLE)
+        bq_hook.return_value.get_job.return_value.result.return_value = ("1",)
+        gcs_hook.return_value.download.return_value = b"id,name\r\n1,Anna"
 
+        operator = GCSToBigQueryOperator(
+            task_id=TASK_ID,
+            bucket=TEST_BUCKET,
+            source_objects=TEST_SOURCE_OBJECTS,
+            destination_project_dataset_table=TEST_EXPLICIT_DEST,
+            write_disposition=WRITE_DISPOSITION,
+            schema_fields=SCHEMA_FIELDS,
+            max_id_key=MAX_ID_KEY,
+            external_table=False,
+            autodetect=True,
+            deferrable=True,
+        )
 
-def create_context(task):
-    dag = DAG(dag_id="dag")
-    logical_date = datetime(2022, 1, 1, 0, 0, 0)
-    dag_run = DagRun(
-        dag_id=dag.dag_id,
-        execution_date=logical_date,
-        run_id=DagRun.generate_run_id(DagRunType.MANUAL, logical_date),
-    )
-    task_instance = TaskInstance(task=task)
-    task_instance.dag_run = dag_run
-    task_instance.dag_id = dag.dag_id
-    task_instance.xcom_push = mock.Mock()
-    return {
-        "dag": dag,
-        "run_id": dag_run.run_id,
-        "task": task,
-        "ti": task_instance,
-        "task_instance": task_instance,
-        "logical_date": logical_date,
-    }
+        with pytest.raises(TaskDeferred):
+            result = operator.execute(self.create_context(operator))
+            assert result == "1"
+
+            calls = [
+                call(
+                    configuration={
+                        "load": dict(
+                            autodetect=True,
+                            createDisposition="CREATE_IF_NEEDED",
+                            destinationTable={
+                                "projectId": PROJECT_ID,
+                                "datasetId": DATASET,
+                                "tableId": TABLE,
+                            },
+                            destinationTableProperties={
+                                "description": None,
+                                "labels": None,
+                            },
+                            sourceFormat="CSV",
+                            skipLeadingRows=None,
+                            
sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"],
+                            writeDisposition=WRITE_DISPOSITION,
+                            ignoreUnknownValues=False,
+                            allowQuotedNewlines=False,
+                            encoding="UTF-8",
+                        ),
+                    },
+                    project_id=bq_hook.return_value.project_id,
+                    location=None,
+                    job_id=pytest.real_job_id,
+                    timeout=None,
+                    retry=DEFAULT_RETRY,
+                    nowait=True,
+                ),
+                call(
+                    configuration={
+                        "query": {
+                            "query": f"SELECT MAX({MAX_ID_KEY}) AS max_value 
FROM {TEST_EXPLICIT_DEST}",
+                            "useLegacySql": False,
+                            "schemaUpdateOptions": [],
+                        }
+                    },
+                    project_id=bq_hook.return_value.project_id,
+                ),
+            ]
+
+            bq_hook.return_value.insert_job.assert_has_calls(calls)
+
+    def create_context(self, task):
+        dag = DAG(dag_id="dag")
+        logical_date = datetime(2022, 1, 1, 0, 0, 0)
+        dag_run = DagRun(
+            dag_id=dag.dag_id,
+            execution_date=logical_date,
+            run_id=DagRun.generate_run_id(DagRunType.MANUAL, logical_date),
+        )
+        task_instance = TaskInstance(task=task)
+        task_instance.dag_run = dag_run
+        task_instance.dag_id = dag.dag_id
+        task_instance.xcom_push = mock.Mock()
+        return {
+            "dag": dag,
+            "run_id": dag_run.run_id,
+            "task": task,
+            "ti": task_instance,
+            "task_instance": task_instance,
+            "logical_date": logical_date,
+        }
diff --git 
a/tests/system/providers/google/cloud/gcs/example_gcs_to_bigquery_async.py 
b/tests/system/providers/google/cloud/gcs/example_gcs_to_bigquery_async.py
index ebfebab585..5888056456 100644
--- a/tests/system/providers/google/cloud/gcs/example_gcs_to_bigquery_async.py
+++ b/tests/system/providers/google/cloud/gcs/example_gcs_to_bigquery_async.py
@@ -37,8 +37,12 @@ DAG_ID = "gcs_to_bigquery_operator_async"
 
 DATASET_NAME_STR = f"dataset_{DAG_ID}_{ENV_ID}_STR"
 DATASET_NAME_DATE = f"dataset_{DAG_ID}_{ENV_ID}_DATE"
+DATASET_NAME_JSON = f"dataset_{DAG_ID}_{ENV_ID}_JSON"
+DATASET_NAME_DELIMITER = f"dataset_{DAG_ID}_{ENV_ID}_DELIMITER"
 TABLE_NAME_STR = "test_str"
 TABLE_NAME_DATE = "test_date"
+TABLE_NAME_JSON = "test_json"
+TABLE_NAME_DELIMITER = "test_delimiter"
 MAX_ID_STR = "name"
 MAX_ID_DATE = "date"
 
@@ -49,14 +53,24 @@ with models.DAG(
     catchup=False,
     tags=["example", "gcs"],
 ) as dag:
-    create_test_dataset_for_string_fileds = BigQueryCreateEmptyDatasetOperator(
+    create_test_dataset_for_string_fields = BigQueryCreateEmptyDatasetOperator(
         task_id="create_airflow_test_dataset_str", 
dataset_id=DATASET_NAME_STR, project_id=PROJECT_ID
     )
 
-    create_test_dataset_for_date_fileds = BigQueryCreateEmptyDatasetOperator(
+    create_test_dataset_for_date_fields = BigQueryCreateEmptyDatasetOperator(
         task_id="create_airflow_test_dataset_date", 
dataset_id=DATASET_NAME_DATE, project_id=PROJECT_ID
     )
 
+    create_test_dataset_for_json_fields = BigQueryCreateEmptyDatasetOperator(
+        task_id="create_airflow_test_dataset_json", 
dataset_id=DATASET_NAME_JSON, project_id=PROJECT_ID
+    )
+
+    create_test_dataset_for_delimiter_fields = 
BigQueryCreateEmptyDatasetOperator(
+        task_id="create_airflow_test_dataset_delimiter",
+        dataset_id=DATASET_NAME_DELIMITER,
+        project_id=PROJECT_ID,
+    )
+
     # [START howto_operator_gcs_to_bigquery_async]
     load_string_based_csv = GCSToBigQueryOperator(
         task_id="gcs_to_bigquery_example_str_csv_async",
@@ -81,6 +95,34 @@ with models.DAG(
         max_id_key=MAX_ID_DATE,
         deferrable=True,
     )
+
+    load_json = GCSToBigQueryOperator(
+        task_id="gcs_to_bigquery_example_date_json_async",
+        bucket="cloud-samples-data",
+        source_objects=["bigquery/us-states/us-states.json"],
+        source_format="NEWLINE_DELIMITED_JSON",
+        
destination_project_dataset_table=f"{DATASET_NAME_JSON}.{TABLE_NAME_JSON}",
+        write_disposition="WRITE_TRUNCATE",
+        external_table=False,
+        autodetect=True,
+        max_id_key=MAX_ID_STR,
+        deferrable=True,
+    )
+
+    load_csv_delimiter = GCSToBigQueryOperator(
+        task_id="gcs_to_bigquery_example_delimiter_async",
+        bucket="big-query-samples",
+        source_objects=["employees-tabular.csv"],
+        source_format="csv",
+        
destination_project_dataset_table=f"{DATASET_NAME_DELIMITER}.{TABLE_NAME_DELIMITER}",
+        write_disposition="WRITE_TRUNCATE",
+        external_table=False,
+        autodetect=True,
+        field_delimiter="\t",
+        quote_character="",
+        max_id_key=MAX_ID_STR,
+        deferrable=True,
+    )
     # [END howto_operator_gcs_to_bigquery_async]
 
     delete_test_dataset_str = BigQueryDeleteDatasetOperator(
@@ -97,16 +139,36 @@ with models.DAG(
         trigger_rule=TriggerRule.ALL_DONE,
     )
 
+    delete_test_dataset_json = BigQueryDeleteDatasetOperator(
+        task_id="delete_airflow_test_json_dataset",
+        dataset_id=DATASET_NAME_JSON,
+        delete_contents=True,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    delete_test_dataset_delimiter = BigQueryDeleteDatasetOperator(
+        task_id="delete_airflow_test_delimiter",
+        dataset_id=DATASET_NAME_JSON,
+        delete_contents=True,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
     (
         # TEST SETUP
-        create_test_dataset_for_string_fileds
-        >> create_test_dataset_for_date_fileds
+        create_test_dataset_for_string_fields
+        >> create_test_dataset_for_date_fields
+        >> create_test_dataset_for_json_fields
+        >> create_test_dataset_for_delimiter_fields
         # TEST BODY
         >> load_string_based_csv
         >> load_date_based_csv
+        >> load_json
+        >> load_csv_delimiter
         # TEST TEARDOWN
         >> delete_test_dataset_str
         >> delete_test_dataset_date
+        >> delete_test_dataset_json
+        >> delete_test_dataset_delimiter
     )
 
     from tests.system.utils.watcher import watcher

Reply via email to