This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new d74a2f53e47 Fix ReadAllFromBigQuery leak temp dataset (#31895)
d74a2f53e47 is described below

commit d74a2f53e474c206ee329bdb747cf9302a221a65
Author: Yi Hu <ya...@google.com>
AuthorDate: Tue Jul 16 11:20:50 2024 -0400

    Fix ReadAllFromBigQuery leak temp dataset (#31895)
    
    * Fix ReadAllFromBigQuery leak temp dataset
    
    * Fix potential duplicate job name
---
 .../apache_beam/io/gcp/bigquery_read_internal.py   | 49 +++++++++++-----------
 .../apache_beam/io/gcp/bigquery_read_it_test.py    |  4 +-
 2 files changed, 27 insertions(+), 26 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/bigquery_read_internal.py 
b/sdks/python/apache_beam/io/gcp/bigquery_read_internal.py
index ce49cd0161d..f74b7dabfb7 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_read_internal.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_read_internal.py
@@ -24,7 +24,7 @@ import collections
 import decimal
 import json
 import logging
-import random
+import secrets
 import time
 import uuid
 from typing import TYPE_CHECKING
@@ -212,7 +212,7 @@ class _BigQueryReadSplit(beam.transforms.DoFn):
     self._source_uuid = unique_id
     self.kms_key = kms_key
     self.project = project
-    self.temp_dataset = temp_dataset or 'bq_read_all_%s' % uuid.uuid4().hex
+    self.temp_dataset = temp_dataset
     self.query_priority = query_priority
     self.bq_io_metadata = None
 
@@ -226,22 +226,27 @@ class _BigQueryReadSplit(beam.transforms.DoFn):
         'temp_dataset': str(self.temp_dataset)
     }
 
-  def _get_temp_dataset(self):
-    if isinstance(self.temp_dataset, str):
-      return DatasetReference(
-          datasetId=self.temp_dataset, projectId=self._get_project())
-    else:
+  def _get_temp_dataset_id(self):
+    if self.temp_dataset is None:
+      return None
+    elif isinstance(self.temp_dataset, DatasetReference):
+      return self.temp_dataset.datasetId
+    elif isinstance(self.temp_dataset, str):
       return self.temp_dataset
+    else:
+      raise ValueError("temp_dataset has to be either str or DatasetReference")
 
-  def process(self,
-              element: 'ReadFromBigQueryRequest') -> Iterable[BoundedSource]:
-    bq = bigquery_tools.BigQueryWrapper(
-        temp_dataset_id=self._get_temp_dataset().datasetId,
+  def setup(self):
+    self.bq = bigquery_tools.BigQueryWrapper(
+        temp_dataset_id=self._get_temp_dataset_id(),
         client=bigquery_tools.BigQueryWrapper._bigquery_client(self.options))
 
+  def process(self,
+              element: 'ReadFromBigQueryRequest') -> Iterable[BoundedSource]:
     if element.query is not None:
-      self._setup_temporary_dataset(bq, element)
-      table_reference = self._execute_query(bq, element)
+      if not self.bq.created_temp_dataset:
+        self._setup_temporary_dataset(self.bq, element)
+      table_reference = self._execute_query(self.bq, element)
     else:
       assert element.table
       table_reference = bigquery_tools.parse_table_reference(
@@ -250,19 +255,21 @@ class _BigQueryReadSplit(beam.transforms.DoFn):
     if not table_reference.projectId:
       table_reference.projectId = self._get_project()
 
-    schema, metadata_list = self._export_files(bq, element, table_reference)
+    schema, metadata_list = self._export_files(
+        self.bq, element, table_reference)
 
     for metadata in metadata_list:
       yield self._create_source(metadata.path, schema)
 
     if element.query is not None:
-      bq._delete_table(
+      self.bq._delete_table(
           table_reference.projectId,
           table_reference.datasetId,
           table_reference.tableId)
 
-    if bq.created_temp_dataset:
-      self._clean_temporary_dataset(bq, element)
+  def teardown(self):
+    if self.bq.created_temp_dataset:
+      self.bq.clean_up_temporary_dataset(self._get_project())
 
   def _get_bq_metadata(self):
     if not self.bq_io_metadata:
@@ -288,12 +295,6 @@ class _BigQueryReadSplit(beam.transforms.DoFn):
         self._get_project(), element.query, not element.use_standard_sql)
     bq.create_temporary_dataset(self._get_project(), location)
 
-  def _clean_temporary_dataset(
-      self,
-      bq: bigquery_tools.BigQueryWrapper,
-      element: 'ReadFromBigQueryRequest'):
-    bq.clean_up_temporary_dataset(self._get_project())
-
   def _execute_query(
       self,
       bq: bigquery_tools.BigQueryWrapper,
@@ -302,7 +303,7 @@ class _BigQueryReadSplit(beam.transforms.DoFn):
         self._job_name,
         self._source_uuid,
         bigquery_tools.BigQueryJobTypes.QUERY,
-        '%s_%s' % (int(time.time()), random.randint(0, 1000)))
+        '%s_%s' % (int(time.time()), secrets.token_hex(3)))
     job = bq._start_query_job(
         self._get_project(),
         element.query,
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py 
b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
index d56a4c76471..913d6e078d8 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
@@ -109,11 +109,11 @@ class BigQueryReadIntegrationTests(unittest.TestCase):
     request = bigquery.BigqueryDatasetsDeleteRequest(
         projectId=cls.project, datasetId=cls.dataset_id, deleteContents=True)
     try:
-      _LOGGER.info(
+      _LOGGER.debug(
           "Deleting dataset %s in project %s", cls.dataset_id, cls.project)
       cls.bigquery_client.client.datasets.Delete(request)
     except HttpError:
-      _LOGGER.debug(
+      _LOGGER.warning(
           'Failed to clean up dataset %s in project %s',
           cls.dataset_id,
           cls.project)

Reply via email to