This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 02976be  Refactor: BigQuery to GCS Operator (#22506)
02976be is described below

commit 02976bef885a5da29a8be59b32af51edbf94466c
Author: Shuho Yoshida <shu.st...@gmail.com>
AuthorDate: Mon Mar 28 05:21:35 2022 +0900

    Refactor: BigQuery to GCS Operator (#22506)
---
 airflow/providers/google/cloud/hooks/bigquery.py   |  6 ++--
 .../google/cloud/transfers/bigquery_to_gcs.py      | 36 ++++++----------------
 .../google/cloud/transfers/test_bigquery_to_gcs.py | 32 +++++++------------
 3 files changed, 24 insertions(+), 50 deletions(-)

diff --git a/airflow/providers/google/cloud/hooks/bigquery.py 
b/airflow/providers/google/cloud/hooks/bigquery.py
index 1202c11..3642e8e 100644
--- a/airflow/providers/google/cloud/hooks/bigquery.py
+++ b/airflow/providers/google/cloud/hooks/bigquery.py
@@ -1905,7 +1905,7 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
     def run_extract(
         self,
         source_project_dataset_table: str,
-        destination_cloud_storage_uris: str,
+        destination_cloud_storage_uris: List[str],
         compression: str = 'NONE',
         export_format: str = 'CSV',
         field_delimiter: str = ',',
@@ -1945,7 +1945,7 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
             var_name='source_project_dataset_table',
         )
 
-        configuration = {
+        configuration: Dict[str, Any] = {
             'extract': {
                 'sourceTable': {
                     'projectId': source_project,
@@ -1956,7 +1956,7 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
                 'destinationUris': destination_cloud_storage_uris,
                 'destinationFormat': export_format,
             }
-        }  # type: Dict[str, Any]
+        }
 
         if labels:
             configuration['labels'] = labels
diff --git a/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py 
b/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py
index 2515f47..69c64d0 100644
--- a/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py
@@ -17,9 +17,7 @@
 # under the License.
 """This module contains Google BigQuery to Google Cloud Storage operator."""
 import warnings
-from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Union
-
-from google.cloud.bigquery.table import TableReference
+from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, Union
 
 from airflow.models import BaseOperator
 from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
@@ -128,26 +126,12 @@ class BigQueryToGCSOperator(BaseOperator):
             location=self.location,
             impersonation_chain=self.impersonation_chain,
         )
-
-        table_ref = 
TableReference.from_string(self.source_project_dataset_table, hook.project_id)
-
-        configuration: Dict[str, Any] = {
-            'extract': {
-                'sourceTable': table_ref.to_api_repr(),
-                'compression': self.compression,
-                'destinationUris': self.destination_cloud_storage_uris,
-                'destinationFormat': self.export_format,
-            }
-        }
-
-        if self.labels:
-            configuration['labels'] = self.labels
-
-        if self.export_format == 'CSV':
-            # Only set fieldDelimiter and printHeader fields if using CSV.
-            # Google does not like it if you set these fields for other export
-            # formats.
-            configuration['extract']['fieldDelimiter'] = self.field_delimiter
-            configuration['extract']['printHeader'] = self.print_header
-
-        hook.insert_job(configuration=configuration)
+        hook.run_extract(
+            source_project_dataset_table=self.source_project_dataset_table,
+            destination_cloud_storage_uris=self.destination_cloud_storage_uris,
+            compression=self.compression,
+            export_format=self.export_format,
+            field_delimiter=self.field_delimiter,
+            print_header=self.print_header,
+            labels=self.labels,
+        )
diff --git a/tests/providers/google/cloud/transfers/test_bigquery_to_gcs.py 
b/tests/providers/google/cloud/transfers/test_bigquery_to_gcs.py
index 2ddac81..4542172 100644
--- a/tests/providers/google/cloud/transfers/test_bigquery_to_gcs.py
+++ b/tests/providers/google/cloud/transfers/test_bigquery_to_gcs.py
@@ -27,10 +27,10 @@ TEST_TABLE_ID = 'test-table-id'
 PROJECT_ID = 'test-project-id'
 
 
-class TestBigQueryToCloudStorageOperator(unittest.TestCase):
+class TestBigQueryToGCSOperator(unittest.TestCase):
     
@mock.patch('airflow.providers.google.cloud.transfers.bigquery_to_gcs.BigQueryHook')
     def test_execute(self, mock_hook):
-        source_project_dataset_table = f'{TEST_DATASET}.{TEST_TABLE_ID}'
+        source_project_dataset_table = 
f'{PROJECT_ID}:{TEST_DATASET}.{TEST_TABLE_ID}'
         destination_cloud_storage_uris = ['gs://some-bucket/some-file.txt']
         compression = 'NONE'
         export_format = 'CSV'
@@ -38,24 +38,6 @@ class TestBigQueryToCloudStorageOperator(unittest.TestCase):
         print_header = True
         labels = {'k1': 'v1'}
 
-        mock_hook().project_id = PROJECT_ID
-
-        configuration = {
-            'extract': {
-                'sourceTable': {
-                    'projectId': mock_hook().project_id,
-                    'datasetId': TEST_DATASET,
-                    'tableId': TEST_TABLE_ID,
-                },
-                'compression': compression,
-                'destinationUris': destination_cloud_storage_uris,
-                'destinationFormat': export_format,
-                'fieldDelimiter': field_delimiter,
-                'printHeader': print_header,
-            },
-            'labels': labels,
-        }
-
         operator = BigQueryToGCSOperator(
             task_id=TASK_ID,
             source_project_dataset_table=source_project_dataset_table,
@@ -69,4 +51,12 @@ class TestBigQueryToCloudStorageOperator(unittest.TestCase):
 
         operator.execute(None)
 
-        
mock_hook.return_value.insert_job.assert_called_once_with(configuration=configuration)
+        mock_hook.return_value.run_extract.assert_called_once_with(
+            source_project_dataset_table=source_project_dataset_table,
+            destination_cloud_storage_uris=destination_cloud_storage_uris,
+            compression=compression,
+            export_format=export_format,
+            field_delimiter=field_delimiter,
+            print_header=print_header,
+            labels=labels,
+        )

Reply via email to