This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 1842dd2f07a Respect BigQuery insert byte size limits (#24979)
1842dd2f07a is described below

commit 1842dd2f07aee9485c37333882c2f9d1e80f880c
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Wed Jul 5 17:03:59 2023 +0000

    Respect BigQuery insert byte size limits (#24979)
    
    * limit insert request by bytes; unit test
    
    * make limit checks before adding row; log a warning when seeing a large row
    
    * send large rows to DLQ
    
    * fix tests
    
    * use number instead of variable
    
    * use global variable
    
    * use number
    
    * use global variable
    
    * use number
    
    * expose max payload size as a parameter
    
    * syntax fix
    
    * remove warning log; revert irrelevant tests
    
    * maintain buffer byte size for each destination
    
    * only delete buffer if it exists
    
    * attribute name change to indicate private
    
    * check user specified maximum is within allowed limits
---
 sdks/python/apache_beam/io/gcp/bigquery.py      | 74 ++++++++++++++++++++++---
 sdks/python/apache_beam/io/gcp/bigquery_test.py | 32 +++++++++++
 2 files changed, 98 insertions(+), 8 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py 
b/sdks/python/apache_beam/io/gcp/bigquery.py
index d12dd276a1a..bcc04acb9a9 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -371,6 +371,7 @@ from typing import Tuple
 from typing import Union
 
 import fastavro
+from objsize import get_deep_size
 
 import apache_beam as beam
 from apache_beam import coders
@@ -474,6 +475,13 @@ tried for a very long time. You may reduce this property 
to reduce the number
 of retries.
 """
 MAX_INSERT_RETRIES = 10000
+"""
+The maximum byte size for a BigQuery legacy streaming insert payload.
+
+Note: The actual limit is 10MB, but we set it to 9MB to make room for request
+overhead: https://cloud.google.com/bigquery/quotas#streaming_inserts
+"""
+MAX_INSERT_PAYLOAD_SIZE = 9 << 20
 
 
 @deprecated(since='2.11.0', current="bigquery_tools.parse_table_reference")
@@ -1325,7 +1333,8 @@ class BigQueryWriteFn(DoFn):
       ignore_insert_ids=False,
       with_batched_input=False,
       ignore_unknown_columns=False,
-      max_retries=MAX_INSERT_RETRIES):
+      max_retries=MAX_INSERT_RETRIES,
+      max_insert_payload_size=MAX_INSERT_PAYLOAD_SIZE):
     """Initialize a WriteToBigQuery transform.
 
     Args:
@@ -1376,7 +1385,8 @@ class BigQueryWriteFn(DoFn):
       max_retries: The number of times that we will retry inserting a group of
         rows into BigQuery. By default, we retry 10000 times with exponential
         backoffs (effectively retry forever).
-
+      max_insert_payload_size: The maximum byte size for a BigQuery legacy
+        streaming insert payload.
     """
     self.schema = schema
     self.test_client = test_client
@@ -1414,6 +1424,7 @@ class BigQueryWriteFn(DoFn):
         BigQueryWriteFn.STREAMING_API_LOGGING_FREQUENCY_SEC)
     self.ignore_unknown_columns = ignore_unknown_columns
     self._max_retries = max_retries
+    self._max_insert_payload_size = max_insert_payload_size
 
   def display_data(self):
     return {
@@ -1429,6 +1440,7 @@ class BigQueryWriteFn(DoFn):
 
   def _reset_rows_buffer(self):
     self._rows_buffer = collections.defaultdict(lambda: [])
+    self._destination_buffer_byte_size = collections.defaultdict(lambda: 0)
 
   @staticmethod
   def get_table_schema(schema):
@@ -1515,11 +1527,42 @@ class BigQueryWriteFn(DoFn):
 
     if not self.with_batched_input:
       row_and_insert_id = element[1]
+      row_byte_size = get_deep_size(row_and_insert_id)
+
+      # send large rows that exceed BigQuery insert limits to DLQ
+      if row_byte_size >= self._max_insert_payload_size:
+        row_mb_size = row_byte_size / 1_000_000
+        max_mb_size = self._max_insert_payload_size / 1_000_000
+        error = (
+            f"Received row with size {row_mb_size}MB that exceeds "
+            f"the maximum insert payload size set ({max_mb_size}MB).")
+        return [
+            pvalue.TaggedOutput(
+                BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS,
+                GlobalWindows.windowed_value(
+                    (destination, row_and_insert_id[0], error))),
+            pvalue.TaggedOutput(
+                BigQueryWriteFn.FAILED_ROWS,
+                GlobalWindows.windowed_value(
+                    (destination, row_and_insert_id[0])))
+        ]
+
+      # Flush current batch first if adding this row will exceed our limits
+      # limits: byte size; number of rows
+      if ((self._destination_buffer_byte_size[destination] + row_byte_size >
+           self._max_insert_payload_size) or
+          len(self._rows_buffer[destination]) >= self._max_batch_size):
+        flushed_batch = self._flush_batch(destination)
+        # After flushing our existing batch, we now buffer the current row
+        # for the next flush
+        self._rows_buffer[destination].append(row_and_insert_id)
+        self._destination_buffer_byte_size[destination] = row_byte_size
+        return flushed_batch
+
       self._rows_buffer[destination].append(row_and_insert_id)
+      self._destination_buffer_byte_size[destination] += row_byte_size
       self._total_buffered_rows += 1
-      if len(self._rows_buffer[destination]) >= self._max_batch_size:
-        return self._flush_batch(destination)
-      elif self._total_buffered_rows >= self._max_buffered_rows:
+      if self._total_buffered_rows >= self._max_buffered_rows:
         return self._flush_all_batches()
     else:
       # The input is already batched per destination, flush the rows now.
@@ -1549,7 +1592,6 @@ class BigQueryWriteFn(DoFn):
     # Flush the current batch of rows to BigQuery.
     rows_and_insert_ids = self._rows_buffer[destination]
     table_reference = bigquery_tools.parse_table_reference(destination)
-
     if table_reference.projectId is None:
       table_reference.projectId = vp.RuntimeValueProvider.get_value(
           'project', str, '')
@@ -1615,6 +1657,8 @@ class BigQueryWriteFn(DoFn):
 
     self._total_buffered_rows -= len(self._rows_buffer[destination])
     del self._rows_buffer[destination]
+    if destination in self._destination_buffer_byte_size:
+      del self._destination_buffer_byte_size[destination]
 
     return itertools.chain([
         pvalue.TaggedOutput(
@@ -1657,7 +1701,8 @@ class _StreamToBigQuery(PTransform):
       with_auto_sharding,
       num_streaming_keys=DEFAULT_SHARDS_PER_DESTINATION,
       test_client=None,
-      max_retries=None):
+      max_retries=None,
+      max_insert_payload_size=MAX_INSERT_PAYLOAD_SIZE):
     self.table_reference = table_reference
     self.table_side_inputs = table_side_inputs
     self.schema_side_inputs = schema_side_inputs
@@ -1675,6 +1720,7 @@ class _StreamToBigQuery(PTransform):
     self.with_auto_sharding = with_auto_sharding
     self._num_streaming_keys = num_streaming_keys
     self.max_retries = max_retries or MAX_INSERT_RETRIES
+    self._max_insert_payload_size = max_insert_payload_size
 
   class InsertIdPrefixFn(DoFn):
     def start_bundle(self):
@@ -1701,7 +1747,8 @@ class _StreamToBigQuery(PTransform):
         ignore_insert_ids=self.ignore_insert_ids,
         ignore_unknown_columns=self.ignore_unknown_columns,
         with_batched_input=self.with_auto_sharding,
-        max_retries=self.max_retries)
+        max_retries=self.max_retries,
+        max_insert_payload_size=self._max_insert_payload_size)
 
     def _add_random_shard(element):
       key = element[0]
@@ -1801,6 +1848,7 @@ class WriteToBigQuery(PTransform):
       with_auto_sharding=False,
       ignore_unknown_columns=False,
       load_job_project_id=None,
+      max_insert_payload_size=MAX_INSERT_PAYLOAD_SIZE,
       num_streaming_keys=DEFAULT_SHARDS_PER_DESTINATION,
       expansion_service=None):
     """Initialize a WriteToBigQuery transform.
@@ -1960,6 +2008,8 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` 
that has a JSON string,
       expansion_service: The address (host:port) of the expansion service.
         If no expansion service is provided, will attempt to run the default
         GCP expansion service. Used for STORAGE_WRITE_API method.
+      max_insert_payload_size: The maximum byte size for a BigQuery legacy
+        streaming insert payload.
     """
     self._table = table
     self._dataset = dataset
@@ -1997,6 +2047,7 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` 
that has a JSON string,
     self._ignore_insert_ids = ignore_insert_ids
     self._ignore_unknown_columns = ignore_unknown_columns
     self.load_job_project_id = load_job_project_id
+    self._max_insert_payload_size = max_insert_payload_size
     self._num_streaming_keys = num_streaming_keys
 
   # Dict/schema methods were moved to bigquery_tools, but keep references
@@ -2045,6 +2096,12 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` 
that has a JSON string,
             'triggering_frequency with STREAMING_INSERTS can only be used with 
'
             'with_auto_sharding=True.')
 
+      if self._max_insert_payload_size > MAX_INSERT_PAYLOAD_SIZE:
+        raise ValueError(
+            'max_insert_payload_size can only go up to '
+            f'{MAX_INSERT_PAYLOAD_SIZE} bytes, as per BigQuery quota limits: '
+            'https://cloud.google.com/bigquery/quotas#streaming_inserts.')
+
       outputs = pcoll | _StreamToBigQuery(
           table_reference=self.table_reference,
           table_side_inputs=self.table_side_inputs,
@@ -2061,6 +2118,7 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` 
that has a JSON string,
           ignore_unknown_columns=self._ignore_unknown_columns,
           with_auto_sharding=self.with_auto_sharding,
           test_client=self.test_client,
+          max_insert_payload_size=self._max_insert_payload_size,
           num_streaming_keys=self._num_streaming_keys)
 
       return WriteResult(
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py 
b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index 0b37ca46e1a..fe1a568f414 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -788,6 +788,37 @@ class TestWriteToBigQuery(unittest.TestCase):
               with_auto_sharding=True,
               test_client=client))
 
+  @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
+  def test_streaming_inserts_flush_on_byte_size_limit(self, mock_insert):
+    mock_insert.return_value = []
+    table = 'project:dataset.table'
+    rows = [
+        {
+            'columnA': 'value1'
+        },
+        {
+            'columnA': 'value2'
+        },
+        # this very large row exceeds max size, so should be sent to DLQ
+        {
+            'columnA': "large_string" * 100
+        }
+    ]
+    with beam.Pipeline() as p:
+      failed_rows = (
+          p
+          | beam.Create(rows)
+          | WriteToBigQuery(
+              table=table,
+              method='STREAMING_INSERTS',
+              create_disposition='CREATE_NEVER',
+              schema='columnA:STRING',
+              max_insert_payload_size=500))
+
+      expected_failed_rows = [(table, rows[2])]
+      assert_that(failed_rows.failed_rows, equal_to(expected_failed_rows))
+    self.assertEqual(2, mock_insert.call_count)
+
   @parameterized.expand([
       param(
           exception_type=exceptions.Forbidden if exceptions else None,
@@ -1206,6 +1237,7 @@ class 
BigQueryStreamingInsertTransformTests(unittest.TestCase):
     fn.start_bundle()
     fn.process(('project-id:dataset_id.table_id', ({'month': 1}, 'insertid1')))
     fn.process(('project-id:dataset_id.table_id', ({'month': 2}, 'insertid2')))
+    fn.finish_bundle()
     # InsertRows called as batch size is hit
     self.assertTrue(client.insert_rows_json.called)
 

Reply via email to