gemini-code-assist[bot] commented on code in PR #36791:
URL: https://github.com/apache/beam/pull/36791#discussion_r2519975198
##########
sdks/python/apache_beam/transforms/enrichment_handlers/bigquery_it_test.py:
##########
@@ -355,6 +355,129 @@ def test_bigquery_enrichment_with_redis(self):
assert_that(pcoll_cached, equal_to(expected_rows))
BigQueryEnrichmentHandler.__call__ = actual
+ def test_bigquery_enrichment_no_results_throws_exception(self):
+ requests = [
+ beam.Row(id=999, name='X'), # This ID does not exist
+ ]
+ handler = BigQueryEnrichmentHandler(
+ project=self.project,
+ row_restriction_template="id = {}",
+ table_name=self.table_name,
+ fields=['id'],
+ throw_exception_on_empty_results=True,
+ )
+
+ with self.assertRaisesRegex(ValueError, "no matching row found for query"):
+ with TestPipeline(is_integration_test=True) as test_pipeline:
+ _ = (test_pipeline | beam.Create(requests) | Enrichment(handler))
+
+ def test_bigquery_enrichment_no_results_graceful(self):
+ requests = [
+ beam.Row(id=999, name='X'), # This ID does not exist
+ beam.Row(id=1000, name='Y'), # This ID does not exist
+ ]
+ # When no results are found and not throwing, Enrichment yields original.
+ expected_rows = requests
+
+ handler = BigQueryEnrichmentHandler(
+ project=self.project,
+ row_restriction_template="id = {}",
+ table_name=self.table_name,
+ fields=['id'],
+ min_batch_size=1,
+ max_batch_size=100,
+ throw_exception_on_empty_results=False,
+ )
+
+ with TestPipeline(is_integration_test=True) as test_pipeline:
+ pcoll = (test_pipeline | beam.Create(requests) | Enrichment(handler))
+ assert_that(pcoll, equal_to(expected_rows))
+
+ def test_bigquery_enrichment_no_results_partial_graceful_batched_partial(
+ self):
+ requests = [
+ beam.Row(id=1, name='A'), # This ID exists
+ beam.Row(id=1000, name='Y'), # This ID does not exist
+ ]
+ # When no results are found and not throwing, Enrichment yields original.
+ expected_rows = [
+ beam.Row(id=1, name='A', quantity=2, distribution_center_id=3),
+ beam.Row(id=1000,
+ name='Y'), # This ID does not exist so remains unchanged
+ ]
+
+ handler = BigQueryEnrichmentHandler(
+ project=self.project,
+ row_restriction_template="id = {}",
+ table_name=self.table_name,
+ fields=['id'],
+ min_batch_size=2,
+ max_batch_size=100,
+ throw_exception_on_empty_results=False,
+ )
+
+ with TestPipeline(is_integration_test=True) as test_pipeline:
+ pcoll = (test_pipeline | beam.Create(requests) | Enrichment(handler))
+ assert_that(pcoll, equal_to(expected_rows))
+
+ def test_bigquery_enrichment_no_results_graceful_batched(self):
+ requests = [
+ beam.Row(id=999, name='X'), # This ID does not exist
+ beam.Row(id=1000, name='Y'), # This ID does not exist
+ ]
+ # When no results are found and not throwing, Enrichment yields original.
+ expected_rows = requests
+
+ handler = BigQueryEnrichmentHandler(
+ project=self.project,
+ row_restriction_template="id = {}",
+ table_name=self.table_name,
+ fields=['id'],
+ min_batch_size=2,
+ max_batch_size=100,
+ throw_exception_on_empty_results=False,
+ )
+
+ with TestPipeline(is_integration_test=True) as test_pipeline:
+ pcoll = (test_pipeline | beam.Create(requests) | Enrichment(handler))
+ assert_that(pcoll, equal_to(expected_rows))
+
+ def test_bigquery_enrichment_no_results_with_query_fn_throws_exception(self):
+ requests = [
+ beam.Row(id=999, name='X'), # This ID does not exist
+ ]
+ # This query_fn will return no results
+ fn = functools.partial(query_fn, self.table_name)
+ handler = BigQueryEnrichmentHandler(
+ project=self.project,
+ query_fn=fn,
+ throw_exception_on_empty_results=True,
+ )
+
+ with self.assertRaisesRegex(ValueError, "no matching row found for query"):
+ with TestPipeline(is_integration_test=True) as test_pipeline:
+ _ = (test_pipeline | beam.Create(requests) | Enrichment(handler))
+
+ def test_bigquery_enrichment_no_results_with_query_fn_graceful(self):
+ requests = [
+ beam.Row(id=999, name='X'), # This ID does not exist
+ beam.Row(id=1000, name='Y'), # This ID does not exist
+ ]
+ # When no results are found and not throwing, Enrichment yields original.
+ expected_rows = requests
+
+ # This query_fn will return no results
+ fn = functools.partial(query_fn, self.table_name)
+ handler = BigQueryEnrichmentHandler(
+ project=self.project,
+ query_fn=fn,
+ throw_exception_on_empty_results=False,
+ )
+
+ with TestPipeline(is_integration_test=True) as test_pipeline:
+ pcoll = (test_pipeline | beam.Create(requests) | Enrichment(handler))
+ assert_that(pcoll, equal_to(expected_rows))
+
Review Comment:

The test suite is very comprehensive. To make it even more complete and to
verify the behavior for partial results in a batch when exceptions are enabled,
consider adding a test case for batched requests with partial results when
`throw_exception_on_empty_results=True`. This would confirm that an exception
is raised for the entire batch, which appears to be the intended behavior. Here
is a suggestion for such a test:
```python
def
test_bigquery_enrichment_partial_results_throws_exception_batched(self):
requests = [
beam.Row(id=1, name='A'), # This ID exists
beam.Row(id=1000, name='Y'), # This ID does not exist
]
handler = BigQueryEnrichmentHandler(
project=self.project,
row_restriction_template="id = {}",
table_name=self.table_name,
fields=['id'],
min_batch_size=2,
max_batch_size=100,
throw_exception_on_empty_results=True,
)
with self.assertRaisesRegex(ValueError, "no matching row found for
query"):
with TestPipeline(is_integration_test=True) as test_pipeline:
_ = (test_pipeline | beam.Create(requests) | Enrichment(handler))
```
##########
sdks/python/apache_beam/transforms/enrichment_handlers/bigquery_it_test.py:
##########
@@ -355,6 +355,129 @@ def test_bigquery_enrichment_with_redis(self):
assert_that(pcoll_cached, equal_to(expected_rows))
BigQueryEnrichmentHandler.__call__ = actual
+ def test_bigquery_enrichment_no_results_throws_exception(self):
+ requests = [
+ beam.Row(id=999, name='X'), # This ID does not exist
+ ]
+ handler = BigQueryEnrichmentHandler(
+ project=self.project,
+ row_restriction_template="id = {}",
+ table_name=self.table_name,
+ fields=['id'],
+ throw_exception_on_empty_results=True,
+ )
+
+ with self.assertRaisesRegex(ValueError, "no matching row found for query"):
+ with TestPipeline(is_integration_test=True) as test_pipeline:
+ _ = (test_pipeline | beam.Create(requests) | Enrichment(handler))
+
+ def test_bigquery_enrichment_no_results_graceful(self):
+ requests = [
+ beam.Row(id=999, name='X'), # This ID does not exist
+ beam.Row(id=1000, name='Y'), # This ID does not exist
+ ]
+ # When no results are found and not throwing, Enrichment yields original.
+ expected_rows = requests
+
+ handler = BigQueryEnrichmentHandler(
+ project=self.project,
+ row_restriction_template="id = {}",
+ table_name=self.table_name,
+ fields=['id'],
+ min_batch_size=1,
+ max_batch_size=100,
+ throw_exception_on_empty_results=False,
+ )
+
+ with TestPipeline(is_integration_test=True) as test_pipeline:
+ pcoll = (test_pipeline | beam.Create(requests) | Enrichment(handler))
+ assert_that(pcoll, equal_to(expected_rows))
+
+ def test_bigquery_enrichment_no_results_partial_graceful_batched_partial(
Review Comment:

There seems to be a typo in the test name. `_partial` is repeated. It should
probably be `test_bigquery_enrichment_no_results_graceful_batched_partial`.
```suggestion
def test_bigquery_enrichment_no_results_graceful_batched_partial(
```
##########
sdks/python/apache_beam/transforms/enrichment_handlers/bigquery.py:
##########
@@ -204,11 +212,22 @@ def __call__(self, request: Union[beam.Row,
list[beam.Row]], *args, **kwargs):
query = raw_query.format(*values)
responses_dict = self._execute_query(query)
- for response in responses_dict:
- response_row = beam.Row(**response)
- response_key = self.create_row_key(response_row)
- if response_key in requests_map:
- responses.append((requests_map[response_key], response_row))
+ if responses_dict is not None:
+ for response in responses_dict:
+ response_row = beam.Row(**response)
+ response_key = self.create_row_key(response_row)
+ if response_key in requests_map:
+ responses.append((requests_map[response_key], response_row))
+ if len(responses) < len(request):
+ if self.throw_exception_on_empty_results:
+ raise ValueError("no matching row found for query: " + query)
+ else:
+ _LOGGER.warning('no matching row found for query: ' + query)
Review Comment:

For logging, it's a best practice to use string formatting with arguments.
This defers the string formatting until it's certain the message needs to be
logged, which can be more efficient. This also applies to the duplicated logic
on line 249.
```suggestion
_LOGGER.warning('no matching row found for query: %s', query)
```
##########
sdks/python/apache_beam/transforms/enrichment_handlers/bigquery.py:
##########
@@ -204,11 +212,22 @@ def __call__(self, request: Union[beam.Row,
list[beam.Row]], *args, **kwargs):
query = raw_query.format(*values)
responses_dict = self._execute_query(query)
- for response in responses_dict:
- response_row = beam.Row(**response)
- response_key = self.create_row_key(response_row)
- if response_key in requests_map:
- responses.append((requests_map[response_key], response_row))
+ if responses_dict is not None:
+ for response in responses_dict:
+ response_row = beam.Row(**response)
+ response_key = self.create_row_key(response_row)
+ if response_key in requests_map:
+ responses.append((requests_map[response_key], response_row))
+ if len(responses) < len(request):
+ if self.throw_exception_on_empty_results:
+ raise ValueError("no matching row found for query: " + query)
Review Comment:

For consistency with other error messages in this file (e.g., line 174) and
for better performance, consider using an f-string for formatting the error
message. This also applies to the duplicated logic on line 247.
```suggestion
raise ValueError(f"no matching row found for query: {query}")
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]