kaxil closed pull request #3880: [AIRFLOW-461] Support autodetected schemas in BigQuery run_load URL: https://github.com/apache/incubator-airflow/pull/3880
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py index 7a1631b53a..a4d91769c6 100644 --- a/airflow/contrib/hooks/bigquery_hook.py +++ b/airflow/contrib/hooks/bigquery_hook.py @@ -851,8 +851,8 @@ def run_copy(self, def run_load(self, destination_project_dataset_table, - schema_fields, source_uris, + schema_fields=None, source_format='CSV', create_disposition='CREATE_IF_NEEDED', skip_leading_rows=0, @@ -866,7 +866,8 @@ def run_load(self, schema_update_options=(), src_fmt_configs=None, time_partitioning=None, - cluster_fields=None): + cluster_fields=None, + autodetect=False): """ Executes a BigQuery load command to load data from Google Cloud Storage to BigQuery. See here: @@ -884,7 +885,11 @@ def run_load(self, :type destination_project_dataset_table: str :param schema_fields: The schema field list as defined here: https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load + Required if autodetect=False; optional if autodetect=True. :type schema_fields: list + :param autodetect: Attempt to autodetect the schema for CSV and JSON + source files. + :type autodetect: bool :param source_uris: The source Google Cloud Storage URI (e.g. gs://some-bucket/some-file.txt). A single wild per-object name can be used. @@ -941,6 +946,11 @@ def run_load(self, # if it's not, we raise a ValueError # Refer to this link for more details: # https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions.(key).sourceFormat + + if schema_fields is None and not autodetect: + raise ValueError( + 'You must either pass a schema or autodetect=True.') + if src_fmt_configs is None: src_fmt_configs = {} @@ -975,6 +985,7 @@ def run_load(self, configuration = { 'load': { + 'autodetect': autodetect, 'createDisposition': create_disposition, 'destinationTable': { 'projectId': destination_project, @@ -1592,7 +1603,7 @@ def _split_tablename(table_input, default_project_id, var_name=None): if '.' not in table_input: raise ValueError( - 'Expected deletion_dataset_table name in the format of ' + 'Expected target table name in the format of ' '<dataset>.<table>. Got: {}'.format(table_input)) if not default_project_id: diff --git a/airflow/contrib/operators/bigquery_operator.py b/airflow/contrib/operators/bigquery_operator.py index fec877db05..caed3befed 100644 --- a/airflow/contrib/operators/bigquery_operator.py +++ b/airflow/contrib/operators/bigquery_operator.py @@ -308,7 +308,7 @@ def __init__(self, project_id=None, schema_fields=None, gcs_schema_object=None, - time_partitioning={}, + time_partitioning=None, bigquery_conn_id='bigquery_default', google_cloud_storage_conn_id='google_cloud_default', delegate_to=None, @@ -325,7 +325,7 @@ def __init__(self, self.bigquery_conn_id = bigquery_conn_id self.google_cloud_storage_conn_id = google_cloud_storage_conn_id self.delegate_to = delegate_to - self.time_partitioning = time_partitioning + self.time_partitioning = {} if time_partitioning is None else time_partitioning self.labels = labels def execute(self, context): diff --git a/airflow/contrib/operators/gcs_to_bq.py b/airflow/contrib/operators/gcs_to_bq.py index 39dff21606..a98e15a8d6 100644 --- a/airflow/contrib/operators/gcs_to_bq.py +++ b/airflow/contrib/operators/gcs_to_bq.py @@ -152,6 +152,7 @@ def __init__(self, external_table=False, time_partitioning=None, cluster_fields=None, + autodetect=False, *args, **kwargs): super(GoogleCloudStorageToBigQueryOperator, self).__init__(*args, **kwargs) @@ -190,20 +191,24 @@ def __init__(self, self.src_fmt_configs = src_fmt_configs self.time_partitioning = time_partitioning self.cluster_fields = cluster_fields + self.autodetect = autodetect def execute(self, context): bq_hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id, delegate_to=self.delegate_to) - if not self.schema_fields and \ - self.schema_object and \ - self.source_format != 'DATASTORE_BACKUP': - gcs_hook = GoogleCloudStorageHook( - google_cloud_storage_conn_id=self.google_cloud_storage_conn_id, - delegate_to=self.delegate_to) - schema_fields = json.loads(gcs_hook.download( - self.bucket, - self.schema_object).decode("utf-8")) + if not self.schema_fields: + if self.schema_object and self.source_format != 'DATASTORE_BACKUP': + gcs_hook = GoogleCloudStorageHook( + google_cloud_storage_conn_id=self.google_cloud_storage_conn_id, + delegate_to=self.delegate_to) + schema_fields = json.loads(gcs_hook.download( + self.bucket, + self.schema_object).decode("utf-8")) + elif self.schema_object is None and self.autodetect is False: + raise ValueError('At least one of `schema_fields`, `schema_object`, ' + 'or `autodetect` must be passed.') + else: schema_fields = self.schema_fields @@ -234,6 +239,7 @@ def execute(self, context): schema_fields=schema_fields, source_uris=source_uris, source_format=self.source_format, + autodetect=self.autodetect, create_disposition=self.create_disposition, skip_leading_rows=self.skip_leading_rows, write_disposition=self.write_disposition, diff --git a/tests/contrib/hooks/test_bigquery_hook.py b/tests/contrib/hooks/test_bigquery_hook.py index e1379dde79..9a46212fb3 100644 --- a/tests/contrib/hooks/test_bigquery_hook.py +++ b/tests/contrib/hooks/test_bigquery_hook.py @@ -358,6 +358,14 @@ def run_with_config(config): mocked_rwc.assert_called_once() + @mock.patch.object(hook.BigQueryBaseCursor, 'run_with_configuration') + def test_run_with_auto_detect(self, run_with_config): + destination_project_dataset_table = "autodetect.table" + cursor = hook.BigQueryBaseCursor(mock.Mock(), "project_id") + cursor.run_load(destination_project_dataset_table, [], [], autodetect=True) + args, kwargs = run_with_config.call_args + self.assertIs(args[0]['load']['autodetect'], True) + @mock.patch("airflow.contrib.hooks.bigquery_hook.LoggingMixin") @mock.patch("airflow.contrib.hooks.bigquery_hook.time") @mock.patch.object(hook.BigQueryBaseCursor, 'run_with_configuration') ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services