gemini-code-assist[bot] commented on code in PR #39078:
URL: https://github.com/apache/beam/pull/39078#discussion_r3463226000
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2252,6 +2297,16 @@ def _compute_method(self, experiments,
is_streaming_pipeline):
else:
return self.method
+ def _additional_bq_parameters_for_file_loads(self):
+ if self.schema_update_options is None:
+ return self.additional_bq_parameters
+ if (callable(self.additional_bq_parameters) or
+ isinstance(self.additional_bq_parameters, vp.ValueProvider)):
+ return _AdditionalBQParametersWithSchemaUpdateOptions(
+ self.additional_bq_parameters, self.schema_update_options)
+ return _merge_schema_update_options(
+ self.additional_bq_parameters, self.schema_update_options)
Review Comment:

If `schema_update_options` is a `ValueProvider`, we must treat the entire
`additional_bq_parameters` as dynamic and wrap it in
`_AdditionalBQParametersWithSchemaUpdateOptions` so that the `ValueProvider` is
resolved at execution time rather than being passed as-is in a static
dictionary.
```suggestion
def _additional_bq_parameters_for_file_loads(self):
if self.schema_update_options is None:
return self.additional_bq_parameters
if (callable(self.additional_bq_parameters) or
isinstance(self.additional_bq_parameters, vp.ValueProvider) or
isinstance(self.schema_update_options, vp.ValueProvider)):
return _AdditionalBQParametersWithSchemaUpdateOptions(
self.additional_bq_parameters, self.schema_update_options)
return _merge_schema_update_options(
self.additional_bq_parameters, self.schema_update_options)
```
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -483,6 +484,35 @@ def chain_after(result):
"""
MAX_INSERT_PAYLOAD_SIZE = 9 << 20
+_SCHEMA_UPDATE_OPTIONS = 'schemaUpdateOptions'
+
+
+def _merge_schema_update_options(
+ additional_bq_parameters, schema_update_options):
+ additional_bq_parameters = dict(additional_bq_parameters or {})
+ if _SCHEMA_UPDATE_OPTIONS in additional_bq_parameters:
+ raise ValueError(
+ '%s can be set with either schema_update_options or '
+ 'additional_bq_parameters, but not both.' % _SCHEMA_UPDATE_OPTIONS)
+ additional_bq_parameters[_SCHEMA_UPDATE_OPTIONS] = schema_update_options
+ return additional_bq_parameters
+
+
+class _AdditionalBQParametersWithSchemaUpdateOptions(object):
+ def __init__(self, additional_bq_parameters, schema_update_options):
+ self.additional_bq_parameters = additional_bq_parameters
+ self.schema_update_options = schema_update_options
+
+ def __call__(self, destination):
+ if callable(self.additional_bq_parameters):
+ additional_bq_parameters = self.additional_bq_parameters(destination)
+ elif isinstance(self.additional_bq_parameters, vp.ValueProvider):
+ additional_bq_parameters = self.additional_bq_parameters.get()
+ else:
+ additional_bq_parameters = self.additional_bq_parameters
+ return _merge_schema_update_options(
+ additional_bq_parameters, self.schema_update_options)
Review Comment:

To fully support `ValueProvider`s for `schema_update_options`, we should
resolve the `ValueProvider` at execution time inside the `__call__` method of
`_AdditionalBQParametersWithSchemaUpdateOptions`.
```suggestion
class _AdditionalBQParametersWithSchemaUpdateOptions(object):
def __init__(self, additional_bq_parameters, schema_update_options):
self.additional_bq_parameters = additional_bq_parameters
self.schema_update_options = schema_update_options
def __call__(self, destination):
if callable(self.additional_bq_parameters):
additional_bq_parameters = self.additional_bq_parameters(destination)
elif isinstance(self.additional_bq_parameters, vp.ValueProvider):
additional_bq_parameters = self.additional_bq_parameters.get()
else:
additional_bq_parameters = self.additional_bq_parameters
schema_update_options = (
self.schema_update_options.get()
if isinstance(self.schema_update_options, vp.ValueProvider)
else self.schema_update_options)
return _merge_schema_update_options(
additional_bq_parameters, schema_update_options)
```
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -483,6 +484,35 @@ def chain_after(result):
"""
MAX_INSERT_PAYLOAD_SIZE = 9 << 20
+_SCHEMA_UPDATE_OPTIONS = 'schemaUpdateOptions'
+
+
+def _merge_schema_update_options(
+ additional_bq_parameters, schema_update_options):
+ additional_bq_parameters = dict(additional_bq_parameters or {})
+ if _SCHEMA_UPDATE_OPTIONS in additional_bq_parameters:
+ raise ValueError(
+ '%s can be set with either schema_update_options or '
+ 'additional_bq_parameters, but not both.' % _SCHEMA_UPDATE_OPTIONS)
+ additional_bq_parameters[_SCHEMA_UPDATE_OPTIONS] = schema_update_options
+ return additional_bq_parameters
Review Comment:

If `schema_update_options` is `None` (which can happen if a `ValueProvider`
resolves to `None` at runtime), merging it directly into
`additional_bq_parameters` will result in `{'schemaUpdateOptions': None}` being
sent to the BigQuery API, which is invalid and will cause the load job to fail.
We should only merge `schema_update_options` if it is not `None`.
```suggestion
def _merge_schema_update_options(
additional_bq_parameters, schema_update_options):
additional_bq_parameters = dict(additional_bq_parameters or {})
if schema_update_options is not None:
if _SCHEMA_UPDATE_OPTIONS in additional_bq_parameters:
raise ValueError(
'%s can be set with either schema_update_options or '
'additional_bq_parameters, but not both.' % _SCHEMA_UPDATE_OPTIONS)
additional_bq_parameters[_SCHEMA_UPDATE_OPTIONS] = schema_update_options
return additional_bq_parameters
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]