This is an automated email from the ASF dual-hosted git repository. pabloem pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 6e7f893 [py] Supporting ignore_unknown_values for WriteToBigQuery new 3b9d1d5 Merge pull request #16081 from [BEAM-9706][py] Supporting ignore_unknown_values for WriteToBigQuery 6e7f893 is described below commit 6e7f8938e727068f8712303465fbe87205f24f28 Author: Pablo Estrada <pabl...@apache.org> AuthorDate: Mon Nov 29 12:13:29 2021 -0800 [py] Supporting ignore_unknown_values for WriteToBigQuery --- sdks/python/apache_beam/io/gcp/bigquery.py | 24 ++++++++++++++++++---- sdks/python/apache_beam/io/gcp/bigquery_tools.py | 17 +++++++++++---- .../apache_beam/io/gcp/bigquery_tools_test.py | 5 +++-- 3 files changed, 36 insertions(+), 10 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index f41629b..0d98d7b 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -1489,7 +1489,8 @@ class BigQueryWriteFn(DoFn): retry_strategy=None, additional_bq_parameters=None, ignore_insert_ids=False, - with_batched_input=False): + with_batched_input=False, + ignore_unknown_columns=False): """Initialize a WriteToBigQuery transform. Args: @@ -1533,6 +1534,11 @@ class BigQueryWriteFn(DoFn): with_batched_input: Whether the input has already been batched per destination. If not, perform best-effort batching per destination within a bundle. + ignore_unknown_columns: Accept rows that contain values that do not match + the schema. The unknown values are ignored. Default is False, + which treats unknown values as errors. See reference: + https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insertAll + """ self.schema = schema self.test_client = test_client @@ -1568,6 +1574,7 @@ class BigQueryWriteFn(DoFn): self.bigquery_wrapper = None self.streaming_api_logging_frequency_sec = ( BigQueryWriteFn.STREAMING_API_LOGGING_FREQUENCY_SEC) + self.ignore_unknown_columns = ignore_unknown_columns def display_data(self): return { @@ -1577,7 +1584,8 @@ class BigQueryWriteFn(DoFn): 'create_disposition': str(self.create_disposition), 'write_disposition': str(self.write_disposition), 'additional_bq_parameters': str(self.additional_bq_parameters), - 'ignore_insert_ids': str(self.ignore_insert_ids) + 'ignore_insert_ids': str(self.ignore_insert_ids), + 'ignore_unknown_columns': str(self.ignore_unknown_columns) } def _reset_rows_buffer(self): @@ -1725,7 +1733,8 @@ class BigQueryWriteFn(DoFn): table_id=table_reference.tableId, rows=rows, insert_ids=insert_ids, - skip_invalid_rows=True) + skip_invalid_rows=True, + ignore_unknown_values=self.ignore_unknown_columns) self.batch_latency_metric.update((time.time() - start) * 1000) failed_rows = [rows[entry['index']] for entry in errors] @@ -1916,7 +1925,8 @@ class WriteToBigQuery(PTransform): temp_file_format=None, ignore_insert_ids=False, # TODO(BEAM-11857): Switch the default when the feature is mature. - with_auto_sharding=False): + with_auto_sharding=False, + ignore_unknown_columns=False): """Initialize a WriteToBigQuery transform. Args: @@ -2043,6 +2053,11 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string, determined number of shards to write to BigQuery. This can be used for both FILE_LOADS and STREAMING_INSERTS. Only applicable to unbounded input. + ignore_unknown_columns: Accept rows that contain values that do not match + the schema. The unknown values are ignored. Default is False, + which treats unknown values as errors. This option is only valid for + method=STREAMING_INSERTS. See reference: + https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insertAll """ self._table = table self._dataset = dataset @@ -2076,6 +2091,7 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string, self.table_side_inputs = table_side_inputs or () self.schema_side_inputs = schema_side_inputs or () self._ignore_insert_ids = ignore_insert_ids + self._ignore_unknown_columns = ignore_unknown_columns # Dict/schema methods were moved to bigquery_tools, but keep references # here for backward compatibility. diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py index c1bc2d9..56425f7 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py @@ -663,7 +663,8 @@ class BigQueryWrapper(object): table_id, rows, insert_ids, - skip_invalid_rows=False): + skip_invalid_rows=False, + ignore_unknown_values=False): """Calls the insertAll BigQuery API endpoint. Docs for this BQ call: https://cloud.google.com/bigquery/docs/reference\ @@ -697,7 +698,8 @@ class BigQueryWrapper(object): table_ref_str, json_rows=rows, row_ids=insert_ids, - skip_invalid_rows=True, + skip_invalid_rows=skip_invalid_rows, + ignore_unknown_values=ignore_unknown_values, timeout=BQ_STREAMING_INSERT_TIMEOUT_SEC) if not errors: service_call_metric.call('ok') @@ -1217,7 +1219,8 @@ class BigQueryWrapper(object): table_id, rows, insert_ids=None, - skip_invalid_rows=False): + skip_invalid_rows=False, + ignore_unknown_values=False): """Inserts rows into the specified table. Args: @@ -1228,6 +1231,10 @@ class BigQueryWrapper(object): each key in it is the name of a field. skip_invalid_rows: If there are rows with insertion errors, whether they should be skipped, and all others should be inserted successfully. + ignore_unknown_values: Set this option to true to ignore unknown column + names. If the input rows contain columns that are not + part of the existing table's schema, those columns are ignored, and + the rows are successfully inserted. Returns: A tuple (bool, errors). If first element is False then the second element @@ -1250,7 +1257,9 @@ class BigQueryWrapper(object): ] result, errors = self._insert_all_rows( - project_id, dataset_id, table_id, rows, insert_ids) + project_id, dataset_id, table_id, rows, insert_ids, + skip_invalid_rows=skip_invalid_rows, + ignore_unknown_values=ignore_unknown_values) return result, errors def _convert_cell_value_to_dict(self, value, field): diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py index e26cfb5..f9174f5 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py @@ -959,8 +959,9 @@ class TestBigQueryWriter(unittest.TestCase): '%s.%s.%s' % ('project', 'dataset', 'table'), json_rows=[sample_row], row_ids=['_1'], - skip_invalid_rows=True, - timeout=120) + skip_invalid_rows=False, + timeout=120, + ignore_unknown_values=False) def test_table_schema_without_project(self): # Writer should pick executing project by default.