This is an automated email from the ASF dual-hosted git repository. yhu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 1842dd2f07a Respect BigQuery insert byte size limits (#24979) 1842dd2f07a is described below commit 1842dd2f07aee9485c37333882c2f9d1e80f880c Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Wed Jul 5 17:03:59 2023 +0000 Respect BigQuery insert byte size limits (#24979) * limit insert request by bytes; unit test * make limit checks before adding row; log a warning when seeing a large row * send large rows to DLQ * fix tests * use number instead of variable * use global variable * use number * use global variable * use number * expose max payload size as a parameter * syntax fix * remove warning log; revert irrelevant tests * maintain buffer byte size for each destination * only delete buffer if it exists * attribute name change to indicate private * check user specified maximum is within allowed limits --- sdks/python/apache_beam/io/gcp/bigquery.py | 74 ++++++++++++++++++++++--- sdks/python/apache_beam/io/gcp/bigquery_test.py | 32 +++++++++++ 2 files changed, 98 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index d12dd276a1a..bcc04acb9a9 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -371,6 +371,7 @@ from typing import Tuple from typing import Union import fastavro +from objsize import get_deep_size import apache_beam as beam from apache_beam import coders @@ -474,6 +475,13 @@ tried for a very long time. You may reduce this property to reduce the number of retries. """ MAX_INSERT_RETRIES = 10000 +""" +The maximum byte size for a BigQuery legacy streaming insert payload. + +Note: The actual limit is 10MB, but we set it to 9MB to make room for request +overhead: https://cloud.google.com/bigquery/quotas#streaming_inserts +""" +MAX_INSERT_PAYLOAD_SIZE = 9 << 20 @deprecated(since='2.11.0', current="bigquery_tools.parse_table_reference") @@ -1325,7 +1333,8 @@ class BigQueryWriteFn(DoFn): ignore_insert_ids=False, with_batched_input=False, ignore_unknown_columns=False, - max_retries=MAX_INSERT_RETRIES): + max_retries=MAX_INSERT_RETRIES, + max_insert_payload_size=MAX_INSERT_PAYLOAD_SIZE): """Initialize a WriteToBigQuery transform. Args: @@ -1376,7 +1385,8 @@ class BigQueryWriteFn(DoFn): max_retries: The number of times that we will retry inserting a group of rows into BigQuery. By default, we retry 10000 times with exponential backoffs (effectively retry forever). - + max_insert_payload_size: The maximum byte size for a BigQuery legacy + streaming insert payload. """ self.schema = schema self.test_client = test_client @@ -1414,6 +1424,7 @@ class BigQueryWriteFn(DoFn): BigQueryWriteFn.STREAMING_API_LOGGING_FREQUENCY_SEC) self.ignore_unknown_columns = ignore_unknown_columns self._max_retries = max_retries + self._max_insert_payload_size = max_insert_payload_size def display_data(self): return { @@ -1429,6 +1440,7 @@ class BigQueryWriteFn(DoFn): def _reset_rows_buffer(self): self._rows_buffer = collections.defaultdict(lambda: []) + self._destination_buffer_byte_size = collections.defaultdict(lambda: 0) @staticmethod def get_table_schema(schema): @@ -1515,11 +1527,42 @@ class BigQueryWriteFn(DoFn): if not self.with_batched_input: row_and_insert_id = element[1] + row_byte_size = get_deep_size(row_and_insert_id) + + # send large rows that exceed BigQuery insert limits to DLQ + if row_byte_size >= self._max_insert_payload_size: + row_mb_size = row_byte_size / 1_000_000 + max_mb_size = self._max_insert_payload_size / 1_000_000 + error = ( + f"Received row with size {row_mb_size}MB that exceeds " + f"the maximum insert payload size set ({max_mb_size}MB).") + return [ + pvalue.TaggedOutput( + BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS, + GlobalWindows.windowed_value( + (destination, row_and_insert_id[0], error))), + pvalue.TaggedOutput( + BigQueryWriteFn.FAILED_ROWS, + GlobalWindows.windowed_value( + (destination, row_and_insert_id[0]))) + ] + + # Flush current batch first if adding this row will exceed our limits + # limits: byte size; number of rows + if ((self._destination_buffer_byte_size[destination] + row_byte_size > + self._max_insert_payload_size) or + len(self._rows_buffer[destination]) >= self._max_batch_size): + flushed_batch = self._flush_batch(destination) + # After flushing our existing batch, we now buffer the current row + # for the next flush + self._rows_buffer[destination].append(row_and_insert_id) + self._destination_buffer_byte_size[destination] = row_byte_size + return flushed_batch + self._rows_buffer[destination].append(row_and_insert_id) + self._destination_buffer_byte_size[destination] += row_byte_size self._total_buffered_rows += 1 - if len(self._rows_buffer[destination]) >= self._max_batch_size: - return self._flush_batch(destination) - elif self._total_buffered_rows >= self._max_buffered_rows: + if self._total_buffered_rows >= self._max_buffered_rows: return self._flush_all_batches() else: # The input is already batched per destination, flush the rows now. @@ -1549,7 +1592,6 @@ class BigQueryWriteFn(DoFn): # Flush the current batch of rows to BigQuery. rows_and_insert_ids = self._rows_buffer[destination] table_reference = bigquery_tools.parse_table_reference(destination) - if table_reference.projectId is None: table_reference.projectId = vp.RuntimeValueProvider.get_value( 'project', str, '') @@ -1615,6 +1657,8 @@ class BigQueryWriteFn(DoFn): self._total_buffered_rows -= len(self._rows_buffer[destination]) del self._rows_buffer[destination] + if destination in self._destination_buffer_byte_size: + del self._destination_buffer_byte_size[destination] return itertools.chain([ pvalue.TaggedOutput( @@ -1657,7 +1701,8 @@ class _StreamToBigQuery(PTransform): with_auto_sharding, num_streaming_keys=DEFAULT_SHARDS_PER_DESTINATION, test_client=None, - max_retries=None): + max_retries=None, + max_insert_payload_size=MAX_INSERT_PAYLOAD_SIZE): self.table_reference = table_reference self.table_side_inputs = table_side_inputs self.schema_side_inputs = schema_side_inputs @@ -1675,6 +1720,7 @@ class _StreamToBigQuery(PTransform): self.with_auto_sharding = with_auto_sharding self._num_streaming_keys = num_streaming_keys self.max_retries = max_retries or MAX_INSERT_RETRIES + self._max_insert_payload_size = max_insert_payload_size class InsertIdPrefixFn(DoFn): def start_bundle(self): @@ -1701,7 +1747,8 @@ class _StreamToBigQuery(PTransform): ignore_insert_ids=self.ignore_insert_ids, ignore_unknown_columns=self.ignore_unknown_columns, with_batched_input=self.with_auto_sharding, - max_retries=self.max_retries) + max_retries=self.max_retries, + max_insert_payload_size=self._max_insert_payload_size) def _add_random_shard(element): key = element[0] @@ -1801,6 +1848,7 @@ class WriteToBigQuery(PTransform): with_auto_sharding=False, ignore_unknown_columns=False, load_job_project_id=None, + max_insert_payload_size=MAX_INSERT_PAYLOAD_SIZE, num_streaming_keys=DEFAULT_SHARDS_PER_DESTINATION, expansion_service=None): """Initialize a WriteToBigQuery transform. @@ -1960,6 +2008,8 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string, expansion_service: The address (host:port) of the expansion service. If no expansion service is provided, will attempt to run the default GCP expansion service. Used for STORAGE_WRITE_API method. + max_insert_payload_size: The maximum byte size for a BigQuery legacy + streaming insert payload. """ self._table = table self._dataset = dataset @@ -1997,6 +2047,7 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string, self._ignore_insert_ids = ignore_insert_ids self._ignore_unknown_columns = ignore_unknown_columns self.load_job_project_id = load_job_project_id + self._max_insert_payload_size = max_insert_payload_size self._num_streaming_keys = num_streaming_keys # Dict/schema methods were moved to bigquery_tools, but keep references @@ -2045,6 +2096,12 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string, 'triggering_frequency with STREAMING_INSERTS can only be used with ' 'with_auto_sharding=True.') + if self._max_insert_payload_size > MAX_INSERT_PAYLOAD_SIZE: + raise ValueError( + 'max_insert_payload_size can only go up to ' + f'{MAX_INSERT_PAYLOAD_SIZE} bytes, as per BigQuery quota limits: ' + 'https://cloud.google.com/bigquery/quotas#streaming_inserts.') + outputs = pcoll | _StreamToBigQuery( table_reference=self.table_reference, table_side_inputs=self.table_side_inputs, @@ -2061,6 +2118,7 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string, ignore_unknown_columns=self._ignore_unknown_columns, with_auto_sharding=self.with_auto_sharding, test_client=self.test_client, + max_insert_payload_size=self._max_insert_payload_size, num_streaming_keys=self._num_streaming_keys) return WriteResult( diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py index 0b37ca46e1a..fe1a568f414 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py @@ -788,6 +788,37 @@ class TestWriteToBigQuery(unittest.TestCase): with_auto_sharding=True, test_client=client)) + @mock.patch('google.cloud.bigquery.Client.insert_rows_json') + def test_streaming_inserts_flush_on_byte_size_limit(self, mock_insert): + mock_insert.return_value = [] + table = 'project:dataset.table' + rows = [ + { + 'columnA': 'value1' + }, + { + 'columnA': 'value2' + }, + # this very large row exceeds max size, so should be sent to DLQ + { + 'columnA': "large_string" * 100 + } + ] + with beam.Pipeline() as p: + failed_rows = ( + p + | beam.Create(rows) + | WriteToBigQuery( + table=table, + method='STREAMING_INSERTS', + create_disposition='CREATE_NEVER', + schema='columnA:STRING', + max_insert_payload_size=500)) + + expected_failed_rows = [(table, rows[2])] + assert_that(failed_rows.failed_rows, equal_to(expected_failed_rows)) + self.assertEqual(2, mock_insert.call_count) + @parameterized.expand([ param( exception_type=exceptions.Forbidden if exceptions else None, @@ -1206,6 +1237,7 @@ class BigQueryStreamingInsertTransformTests(unittest.TestCase): fn.start_bundle() fn.process(('project-id:dataset_id.table_id', ({'month': 1}, 'insertid1'))) fn.process(('project-id:dataset_id.table_id', ({'month': 2}, 'insertid2'))) + fn.finish_bundle() # InsertRows called as batch size is hit self.assertTrue(client.insert_rows_json.called)