gemini-code-assist[bot] commented on code in PR #39078:
URL: https://github.com/apache/beam/pull/39078#discussion_r3463226000


##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2252,6 +2297,16 @@ def _compute_method(self, experiments, 
is_streaming_pipeline):
     else:
       return self.method
 
+  def _additional_bq_parameters_for_file_loads(self):
+    if self.schema_update_options is None:
+      return self.additional_bq_parameters
+    if (callable(self.additional_bq_parameters) or
+        isinstance(self.additional_bq_parameters, vp.ValueProvider)):
+      return _AdditionalBQParametersWithSchemaUpdateOptions(
+          self.additional_bq_parameters, self.schema_update_options)
+    return _merge_schema_update_options(
+        self.additional_bq_parameters, self.schema_update_options)

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   If `schema_update_options` is a `ValueProvider`, we must treat the entire 
`additional_bq_parameters` as dynamic and wrap it in 
`_AdditionalBQParametersWithSchemaUpdateOptions` so that the `ValueProvider` is 
resolved at execution time rather than being passed as-is in a static 
dictionary.
   
   ```suggestion
     def _additional_bq_parameters_for_file_loads(self):
       if self.schema_update_options is None:
         return self.additional_bq_parameters
       if (callable(self.additional_bq_parameters) or
           isinstance(self.additional_bq_parameters, vp.ValueProvider) or
           isinstance(self.schema_update_options, vp.ValueProvider)):
         return _AdditionalBQParametersWithSchemaUpdateOptions(
             self.additional_bq_parameters, self.schema_update_options)
       return _merge_schema_update_options(
           self.additional_bq_parameters, self.schema_update_options)
   ```



##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -483,6 +484,35 @@ def chain_after(result):
 """
 MAX_INSERT_PAYLOAD_SIZE = 9 << 20
 
+_SCHEMA_UPDATE_OPTIONS = 'schemaUpdateOptions'
+
+
+def _merge_schema_update_options(
+    additional_bq_parameters, schema_update_options):
+  additional_bq_parameters = dict(additional_bq_parameters or {})
+  if _SCHEMA_UPDATE_OPTIONS in additional_bq_parameters:
+    raise ValueError(
+        '%s can be set with either schema_update_options or '
+        'additional_bq_parameters, but not both.' % _SCHEMA_UPDATE_OPTIONS)
+  additional_bq_parameters[_SCHEMA_UPDATE_OPTIONS] = schema_update_options
+  return additional_bq_parameters
+
+
+class _AdditionalBQParametersWithSchemaUpdateOptions(object):
+  def __init__(self, additional_bq_parameters, schema_update_options):
+    self.additional_bq_parameters = additional_bq_parameters
+    self.schema_update_options = schema_update_options
+
+  def __call__(self, destination):
+    if callable(self.additional_bq_parameters):
+      additional_bq_parameters = self.additional_bq_parameters(destination)
+    elif isinstance(self.additional_bq_parameters, vp.ValueProvider):
+      additional_bq_parameters = self.additional_bq_parameters.get()
+    else:
+      additional_bq_parameters = self.additional_bq_parameters
+    return _merge_schema_update_options(
+        additional_bq_parameters, self.schema_update_options)

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   To fully support `ValueProvider`s for `schema_update_options`, we should 
resolve the `ValueProvider` at execution time inside the `__call__` method of 
`_AdditionalBQParametersWithSchemaUpdateOptions`.
   
   ```suggestion
   class _AdditionalBQParametersWithSchemaUpdateOptions(object):
     def __init__(self, additional_bq_parameters, schema_update_options):
       self.additional_bq_parameters = additional_bq_parameters
       self.schema_update_options = schema_update_options
   
     def __call__(self, destination):
       if callable(self.additional_bq_parameters):
         additional_bq_parameters = self.additional_bq_parameters(destination)
       elif isinstance(self.additional_bq_parameters, vp.ValueProvider):
         additional_bq_parameters = self.additional_bq_parameters.get()
       else:
         additional_bq_parameters = self.additional_bq_parameters
       schema_update_options = (
           self.schema_update_options.get()
           if isinstance(self.schema_update_options, vp.ValueProvider)
           else self.schema_update_options)
       return _merge_schema_update_options(
           additional_bq_parameters, schema_update_options)
   ```



##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -483,6 +484,35 @@ def chain_after(result):
 """
 MAX_INSERT_PAYLOAD_SIZE = 9 << 20
 
+_SCHEMA_UPDATE_OPTIONS = 'schemaUpdateOptions'
+
+
+def _merge_schema_update_options(
+    additional_bq_parameters, schema_update_options):
+  additional_bq_parameters = dict(additional_bq_parameters or {})
+  if _SCHEMA_UPDATE_OPTIONS in additional_bq_parameters:
+    raise ValueError(
+        '%s can be set with either schema_update_options or '
+        'additional_bq_parameters, but not both.' % _SCHEMA_UPDATE_OPTIONS)
+  additional_bq_parameters[_SCHEMA_UPDATE_OPTIONS] = schema_update_options
+  return additional_bq_parameters

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   If `schema_update_options` is `None` (which can happen if a `ValueProvider` 
resolves to `None` at runtime), merging it directly into 
`additional_bq_parameters` will result in `{'schemaUpdateOptions': None}` being 
sent to the BigQuery API, which is invalid and will cause the load job to fail. 
We should only merge `schema_update_options` if it is not `None`.
   
   ```suggestion
   def _merge_schema_update_options(
       additional_bq_parameters, schema_update_options):
     additional_bq_parameters = dict(additional_bq_parameters or {})
     if schema_update_options is not None:
       if _SCHEMA_UPDATE_OPTIONS in additional_bq_parameters:
         raise ValueError(
             '%s can be set with either schema_update_options or '
             'additional_bq_parameters, but not both.' % _SCHEMA_UPDATE_OPTIONS)
       additional_bq_parameters[_SCHEMA_UPDATE_OPTIONS] = schema_update_options
     return additional_bq_parameters
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to