This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 02976be Refactor: BigQuery to GCS Operator (#22506) 02976be is described below commit 02976bef885a5da29a8be59b32af51edbf94466c Author: Shuho Yoshida <shu.st...@gmail.com> AuthorDate: Mon Mar 28 05:21:35 2022 +0900 Refactor: BigQuery to GCS Operator (#22506) --- airflow/providers/google/cloud/hooks/bigquery.py | 6 ++-- .../google/cloud/transfers/bigquery_to_gcs.py | 36 ++++++---------------- .../google/cloud/transfers/test_bigquery_to_gcs.py | 32 +++++++------------ 3 files changed, 24 insertions(+), 50 deletions(-) diff --git a/airflow/providers/google/cloud/hooks/bigquery.py b/airflow/providers/google/cloud/hooks/bigquery.py index 1202c11..3642e8e 100644 --- a/airflow/providers/google/cloud/hooks/bigquery.py +++ b/airflow/providers/google/cloud/hooks/bigquery.py @@ -1905,7 +1905,7 @@ class BigQueryHook(GoogleBaseHook, DbApiHook): def run_extract( self, source_project_dataset_table: str, - destination_cloud_storage_uris: str, + destination_cloud_storage_uris: List[str], compression: str = 'NONE', export_format: str = 'CSV', field_delimiter: str = ',', @@ -1945,7 +1945,7 @@ class BigQueryHook(GoogleBaseHook, DbApiHook): var_name='source_project_dataset_table', ) - configuration = { + configuration: Dict[str, Any] = { 'extract': { 'sourceTable': { 'projectId': source_project, @@ -1956,7 +1956,7 @@ class BigQueryHook(GoogleBaseHook, DbApiHook): 'destinationUris': destination_cloud_storage_uris, 'destinationFormat': export_format, } - } # type: Dict[str, Any] + } if labels: configuration['labels'] = labels diff --git a/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py b/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py index 2515f47..69c64d0 100644 --- a/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py +++ b/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py @@ -17,9 +17,7 @@ # under the License. """This module contains Google BigQuery to Google Cloud Storage operator.""" import warnings -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Union - -from google.cloud.bigquery.table import TableReference +from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, Union from airflow.models import BaseOperator from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook @@ -128,26 +126,12 @@ class BigQueryToGCSOperator(BaseOperator): location=self.location, impersonation_chain=self.impersonation_chain, ) - - table_ref = TableReference.from_string(self.source_project_dataset_table, hook.project_id) - - configuration: Dict[str, Any] = { - 'extract': { - 'sourceTable': table_ref.to_api_repr(), - 'compression': self.compression, - 'destinationUris': self.destination_cloud_storage_uris, - 'destinationFormat': self.export_format, - } - } - - if self.labels: - configuration['labels'] = self.labels - - if self.export_format == 'CSV': - # Only set fieldDelimiter and printHeader fields if using CSV. - # Google does not like it if you set these fields for other export - # formats. - configuration['extract']['fieldDelimiter'] = self.field_delimiter - configuration['extract']['printHeader'] = self.print_header - - hook.insert_job(configuration=configuration) + hook.run_extract( + source_project_dataset_table=self.source_project_dataset_table, + destination_cloud_storage_uris=self.destination_cloud_storage_uris, + compression=self.compression, + export_format=self.export_format, + field_delimiter=self.field_delimiter, + print_header=self.print_header, + labels=self.labels, + ) diff --git a/tests/providers/google/cloud/transfers/test_bigquery_to_gcs.py b/tests/providers/google/cloud/transfers/test_bigquery_to_gcs.py index 2ddac81..4542172 100644 --- a/tests/providers/google/cloud/transfers/test_bigquery_to_gcs.py +++ b/tests/providers/google/cloud/transfers/test_bigquery_to_gcs.py @@ -27,10 +27,10 @@ TEST_TABLE_ID = 'test-table-id' PROJECT_ID = 'test-project-id' -class TestBigQueryToCloudStorageOperator(unittest.TestCase): +class TestBigQueryToGCSOperator(unittest.TestCase): @mock.patch('airflow.providers.google.cloud.transfers.bigquery_to_gcs.BigQueryHook') def test_execute(self, mock_hook): - source_project_dataset_table = f'{TEST_DATASET}.{TEST_TABLE_ID}' + source_project_dataset_table = f'{PROJECT_ID}:{TEST_DATASET}.{TEST_TABLE_ID}' destination_cloud_storage_uris = ['gs://some-bucket/some-file.txt'] compression = 'NONE' export_format = 'CSV' @@ -38,24 +38,6 @@ class TestBigQueryToCloudStorageOperator(unittest.TestCase): print_header = True labels = {'k1': 'v1'} - mock_hook().project_id = PROJECT_ID - - configuration = { - 'extract': { - 'sourceTable': { - 'projectId': mock_hook().project_id, - 'datasetId': TEST_DATASET, - 'tableId': TEST_TABLE_ID, - }, - 'compression': compression, - 'destinationUris': destination_cloud_storage_uris, - 'destinationFormat': export_format, - 'fieldDelimiter': field_delimiter, - 'printHeader': print_header, - }, - 'labels': labels, - } - operator = BigQueryToGCSOperator( task_id=TASK_ID, source_project_dataset_table=source_project_dataset_table, @@ -69,4 +51,12 @@ class TestBigQueryToCloudStorageOperator(unittest.TestCase): operator.execute(None) - mock_hook.return_value.insert_job.assert_called_once_with(configuration=configuration) + mock_hook.return_value.run_extract.assert_called_once_with( + source_project_dataset_table=source_project_dataset_table, + destination_cloud_storage_uris=destination_cloud_storage_uris, + compression=compression, + export_format=export_format, + field_delimiter=field_delimiter, + print_header=print_header, + labels=labels, + )