This is an automated email from the ASF dual-hosted git repository. riteshghorse pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 8afaa650b04 [Python] Create BigQuery wrapper with pipeline options for ReadAllFromBigQuery (#27599) 8afaa650b04 is described below commit 8afaa650b04427f888ff727184774489967e78c7 Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Mon Jul 24 14:46:38 2023 +0000 [Python] Create BigQuery wrapper with pipeline options for ReadAllFromBigQuery (#27599) --- sdks/python/apache_beam/io/gcp/bigquery_read_internal.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_read_internal.py b/sdks/python/apache_beam/io/gcp/bigquery_read_internal.py index 0ca5c2e69a0..6841de1e26f 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_read_internal.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_read_internal.py @@ -150,7 +150,8 @@ class _PassThroughThenCleanupTempDatasets(PTransform): class CleanUpProjects(beam.DoFn): def process(self, unused_element, unused_signal, pipeline_details): - bq = bigquery_tools.BigQueryWrapper() + bq = bigquery_tools.BigQueryWrapper.from_pipeline_options( + input.pipeline.options) pipeline_details = pipeline_details[0] if 'temp_table_ref' in pipeline_details.keys(): temp_table_ref = pipeline_details['temp_table_ref'] @@ -230,7 +231,8 @@ class _BigQueryReadSplit(beam.transforms.DoFn): def process(self, element: 'ReadFromBigQueryRequest') -> Iterable[BoundedSource]: bq = bigquery_tools.BigQueryWrapper( - temp_dataset_id=self._get_temp_dataset().datasetId) + temp_dataset_id=self._get_temp_dataset().datasetId, + client=bigquery_tools.BigQueryWrapper._bigquery_client(self.options)) if element.query is not None: self._setup_temporary_dataset(bq, element)