This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e7f893  [py] Supporting ignore_unknown_values for WriteToBigQuery
     new 3b9d1d5  Merge pull request #16081 from [BEAM-9706][py] Supporting 
ignore_unknown_values for WriteToBigQuery
6e7f893 is described below

commit 6e7f8938e727068f8712303465fbe87205f24f28
Author: Pablo Estrada <pabl...@apache.org>
AuthorDate: Mon Nov 29 12:13:29 2021 -0800

    [py] Supporting ignore_unknown_values for WriteToBigQuery
---
 sdks/python/apache_beam/io/gcp/bigquery.py         | 24 ++++++++++++++++++----
 sdks/python/apache_beam/io/gcp/bigquery_tools.py   | 17 +++++++++++----
 .../apache_beam/io/gcp/bigquery_tools_test.py      |  5 +++--
 3 files changed, 36 insertions(+), 10 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py 
b/sdks/python/apache_beam/io/gcp/bigquery.py
index f41629b..0d98d7b 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -1489,7 +1489,8 @@ class BigQueryWriteFn(DoFn):
       retry_strategy=None,
       additional_bq_parameters=None,
       ignore_insert_ids=False,
-      with_batched_input=False):
+      with_batched_input=False,
+      ignore_unknown_columns=False):
     """Initialize a WriteToBigQuery transform.
 
     Args:
@@ -1533,6 +1534,11 @@ class BigQueryWriteFn(DoFn):
       with_batched_input: Whether the input has already been batched per
         destination. If not, perform best-effort batching per destination 
within
         a bundle.
+      ignore_unknown_columns: Accept rows that contain values that do not match
+        the schema. The unknown values are ignored. Default is False,
+        which treats unknown values as errors. See reference:
+        
https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insertAll
+
     """
     self.schema = schema
     self.test_client = test_client
@@ -1568,6 +1574,7 @@ class BigQueryWriteFn(DoFn):
     self.bigquery_wrapper = None
     self.streaming_api_logging_frequency_sec = (
         BigQueryWriteFn.STREAMING_API_LOGGING_FREQUENCY_SEC)
+    self.ignore_unknown_columns = ignore_unknown_columns
 
   def display_data(self):
     return {
@@ -1577,7 +1584,8 @@ class BigQueryWriteFn(DoFn):
         'create_disposition': str(self.create_disposition),
         'write_disposition': str(self.write_disposition),
         'additional_bq_parameters': str(self.additional_bq_parameters),
-        'ignore_insert_ids': str(self.ignore_insert_ids)
+        'ignore_insert_ids': str(self.ignore_insert_ids),
+        'ignore_unknown_columns': str(self.ignore_unknown_columns)
     }
 
   def _reset_rows_buffer(self):
@@ -1725,7 +1733,8 @@ class BigQueryWriteFn(DoFn):
           table_id=table_reference.tableId,
           rows=rows,
           insert_ids=insert_ids,
-          skip_invalid_rows=True)
+          skip_invalid_rows=True,
+          ignore_unknown_values=self.ignore_unknown_columns)
       self.batch_latency_metric.update((time.time() - start) * 1000)
 
       failed_rows = [rows[entry['index']] for entry in errors]
@@ -1916,7 +1925,8 @@ class WriteToBigQuery(PTransform):
       temp_file_format=None,
       ignore_insert_ids=False,
       # TODO(BEAM-11857): Switch the default when the feature is mature.
-      with_auto_sharding=False):
+      with_auto_sharding=False,
+      ignore_unknown_columns=False):
     """Initialize a WriteToBigQuery transform.
 
     Args:
@@ -2043,6 +2053,11 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` 
that has a JSON string,
         determined number of shards to write to BigQuery. This can be used for
         both FILE_LOADS and STREAMING_INSERTS. Only applicable to unbounded
         input.
+      ignore_unknown_columns: Accept rows that contain values that do not match
+        the schema. The unknown values are ignored. Default is False,
+        which treats unknown values as errors. This option is only valid for
+        method=STREAMING_INSERTS. See reference:
+        
https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insertAll
     """
     self._table = table
     self._dataset = dataset
@@ -2076,6 +2091,7 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` 
that has a JSON string,
     self.table_side_inputs = table_side_inputs or ()
     self.schema_side_inputs = schema_side_inputs or ()
     self._ignore_insert_ids = ignore_insert_ids
+    self._ignore_unknown_columns = ignore_unknown_columns
 
   # Dict/schema methods were moved to bigquery_tools, but keep references
   # here for backward compatibility.
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py 
b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index c1bc2d9..56425f7 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -663,7 +663,8 @@ class BigQueryWrapper(object):
       table_id,
       rows,
       insert_ids,
-      skip_invalid_rows=False):
+      skip_invalid_rows=False,
+      ignore_unknown_values=False):
     """Calls the insertAll BigQuery API endpoint.
 
     Docs for this BQ call: https://cloud.google.com/bigquery/docs/reference\
@@ -697,7 +698,8 @@ class BigQueryWrapper(object):
           table_ref_str,
           json_rows=rows,
           row_ids=insert_ids,
-          skip_invalid_rows=True,
+          skip_invalid_rows=skip_invalid_rows,
+          ignore_unknown_values=ignore_unknown_values,
           timeout=BQ_STREAMING_INSERT_TIMEOUT_SEC)
       if not errors:
         service_call_metric.call('ok')
@@ -1217,7 +1219,8 @@ class BigQueryWrapper(object):
       table_id,
       rows,
       insert_ids=None,
-      skip_invalid_rows=False):
+      skip_invalid_rows=False,
+      ignore_unknown_values=False):
     """Inserts rows into the specified table.
 
     Args:
@@ -1228,6 +1231,10 @@ class BigQueryWrapper(object):
         each key in it is the name of a field.
       skip_invalid_rows: If there are rows with insertion errors, whether they
         should be skipped, and all others should be inserted successfully.
+      ignore_unknown_values: Set this option to true to ignore unknown column
+        names. If the input rows contain columns that are not
+        part of the existing table's schema, those columns are ignored, and
+        the rows are successfully inserted.
 
     Returns:
       A tuple (bool, errors). If first element is False then the second element
@@ -1250,7 +1257,9 @@ class BigQueryWrapper(object):
     ]
 
     result, errors = self._insert_all_rows(
-        project_id, dataset_id, table_id, rows, insert_ids)
+        project_id, dataset_id, table_id, rows, insert_ids,
+        skip_invalid_rows=skip_invalid_rows,
+        ignore_unknown_values=ignore_unknown_values)
     return result, errors
 
   def _convert_cell_value_to_dict(self, value, field):
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py 
b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
index e26cfb5..f9174f5 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
@@ -959,8 +959,9 @@ class TestBigQueryWriter(unittest.TestCase):
         '%s.%s.%s' % ('project', 'dataset', 'table'),
         json_rows=[sample_row],
         row_ids=['_1'],
-        skip_invalid_rows=True,
-        timeout=120)
+        skip_invalid_rows=False,
+        timeout=120,
+        ignore_unknown_values=False)
 
   def test_table_schema_without_project(self):
     # Writer should pick executing project by default.

Reply via email to