This is an automated email from the ASF dual-hosted git repository. yhu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new d74a2f53e47 Fix ReadAllFromBigQuery leak temp dataset (#31895) d74a2f53e47 is described below commit d74a2f53e474c206ee329bdb747cf9302a221a65 Author: Yi Hu <ya...@google.com> AuthorDate: Tue Jul 16 11:20:50 2024 -0400 Fix ReadAllFromBigQuery leak temp dataset (#31895) * Fix ReadAllFromBigQuery leak temp dataset * Fix potential duplicate job name --- .../apache_beam/io/gcp/bigquery_read_internal.py | 49 +++++++++++----------- .../apache_beam/io/gcp/bigquery_read_it_test.py | 4 +- 2 files changed, 27 insertions(+), 26 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_read_internal.py b/sdks/python/apache_beam/io/gcp/bigquery_read_internal.py index ce49cd0161d..f74b7dabfb7 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_read_internal.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_read_internal.py @@ -24,7 +24,7 @@ import collections import decimal import json import logging -import random +import secrets import time import uuid from typing import TYPE_CHECKING @@ -212,7 +212,7 @@ class _BigQueryReadSplit(beam.transforms.DoFn): self._source_uuid = unique_id self.kms_key = kms_key self.project = project - self.temp_dataset = temp_dataset or 'bq_read_all_%s' % uuid.uuid4().hex + self.temp_dataset = temp_dataset self.query_priority = query_priority self.bq_io_metadata = None @@ -226,22 +226,27 @@ class _BigQueryReadSplit(beam.transforms.DoFn): 'temp_dataset': str(self.temp_dataset) } - def _get_temp_dataset(self): - if isinstance(self.temp_dataset, str): - return DatasetReference( - datasetId=self.temp_dataset, projectId=self._get_project()) - else: + def _get_temp_dataset_id(self): + if self.temp_dataset is None: + return None + elif isinstance(self.temp_dataset, DatasetReference): + return self.temp_dataset.datasetId + elif isinstance(self.temp_dataset, str): return self.temp_dataset + else: + raise ValueError("temp_dataset has to be either str or DatasetReference") - def process(self, - element: 'ReadFromBigQueryRequest') -> Iterable[BoundedSource]: - bq = bigquery_tools.BigQueryWrapper( - temp_dataset_id=self._get_temp_dataset().datasetId, + def setup(self): + self.bq = bigquery_tools.BigQueryWrapper( + temp_dataset_id=self._get_temp_dataset_id(), client=bigquery_tools.BigQueryWrapper._bigquery_client(self.options)) + def process(self, + element: 'ReadFromBigQueryRequest') -> Iterable[BoundedSource]: if element.query is not None: - self._setup_temporary_dataset(bq, element) - table_reference = self._execute_query(bq, element) + if not self.bq.created_temp_dataset: + self._setup_temporary_dataset(self.bq, element) + table_reference = self._execute_query(self.bq, element) else: assert element.table table_reference = bigquery_tools.parse_table_reference( @@ -250,19 +255,21 @@ class _BigQueryReadSplit(beam.transforms.DoFn): if not table_reference.projectId: table_reference.projectId = self._get_project() - schema, metadata_list = self._export_files(bq, element, table_reference) + schema, metadata_list = self._export_files( + self.bq, element, table_reference) for metadata in metadata_list: yield self._create_source(metadata.path, schema) if element.query is not None: - bq._delete_table( + self.bq._delete_table( table_reference.projectId, table_reference.datasetId, table_reference.tableId) - if bq.created_temp_dataset: - self._clean_temporary_dataset(bq, element) + def teardown(self): + if self.bq.created_temp_dataset: + self.bq.clean_up_temporary_dataset(self._get_project()) def _get_bq_metadata(self): if not self.bq_io_metadata: @@ -288,12 +295,6 @@ class _BigQueryReadSplit(beam.transforms.DoFn): self._get_project(), element.query, not element.use_standard_sql) bq.create_temporary_dataset(self._get_project(), location) - def _clean_temporary_dataset( - self, - bq: bigquery_tools.BigQueryWrapper, - element: 'ReadFromBigQueryRequest'): - bq.clean_up_temporary_dataset(self._get_project()) - def _execute_query( self, bq: bigquery_tools.BigQueryWrapper, @@ -302,7 +303,7 @@ class _BigQueryReadSplit(beam.transforms.DoFn): self._job_name, self._source_uuid, bigquery_tools.BigQueryJobTypes.QUERY, - '%s_%s' % (int(time.time()), random.randint(0, 1000))) + '%s_%s' % (int(time.time()), secrets.token_hex(3))) job = bq._start_query_job( self._get_project(), element.query, diff --git a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py index d56a4c76471..913d6e078d8 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py @@ -109,11 +109,11 @@ class BigQueryReadIntegrationTests(unittest.TestCase): request = bigquery.BigqueryDatasetsDeleteRequest( projectId=cls.project, datasetId=cls.dataset_id, deleteContents=True) try: - _LOGGER.info( + _LOGGER.debug( "Deleting dataset %s in project %s", cls.dataset_id, cls.project) cls.bigquery_client.client.datasets.Delete(request) except HttpError: - _LOGGER.debug( + _LOGGER.warning( 'Failed to clean up dataset %s in project %s', cls.dataset_id, cls.project)