gemini-code-assist[bot] commented on code in PR #36791:
URL: https://github.com/apache/beam/pull/36791#discussion_r2519425759


##########
sdks/python/apache_beam/transforms/enrichment_handlers/bigquery.py:
##########
@@ -204,11 +212,23 @@ def __call__(self, request: Union[beam.Row, 
list[beam.Row]], *args, **kwargs):
       query = raw_query.format(*values)
 
       responses_dict = self._execute_query(query)
-      for response in responses_dict:
-        response_row = beam.Row(**response)
-        response_key = self.create_row_key(response_row)
-        if response_key in requests_map:
-          responses.append((requests_map[response_key], response_row))
+      if responses_dict is not None:
+        for response in responses_dict:
+          response_row = beam.Row(**response)
+          response_key = self.create_row_key(response_row)
+          if response_key in requests_map:
+            responses.append((requests_map[response_key], response_row))
+      if len(responses) < len(request):
+        if self.throw_execption_on_empty_results:
+          raise ValueError("no matching row found for query: " + query)
+        else:
+          _LOGGER.warning('no matching row found for query: ' + query)
+          # append empty rows for missing responses
+          for req in request:
+            req_key = self.create_row_key(req)
+            if not any(req_key == self.create_row_key(resp[0])
+                       for resp in responses):
+              responses.append((req, beam.Row()))

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The logic to find requests that didn't receive a response has a nested 
structure that results in O(N*M) complexity, where N is the number of requests 
and M is the number of responses. This could be inefficient for large batches. 
You can optimize this to O(N+M) by using a set of keys from the found responses 
for a quick lookup against the `requests_map` you've already built.
   
   ```suggestion
             # append empty rows for missing responses
             found_req_keys = {self.create_row_key(resp[0]) for resp in 
responses}
             for req_key, req in requests_map.items():
               if req_key not in found_req_keys:
                 responses.append((req, beam.Row()))
   ```



##########
sdks/python/apache_beam/transforms/enrichment_handlers/bigquery_it_test.py:
##########
@@ -355,6 +355,129 @@ def test_bigquery_enrichment_with_redis(self):
       assert_that(pcoll_cached, equal_to(expected_rows))
     BigQueryEnrichmentHandler.__call__ = actual
 
+  def test_bigquery_enrichment_no_results_throws_exception(self):
+    requests = [
+        beam.Row(id=999, name='X'),  # This ID does not exist
+    ]
+    handler = BigQueryEnrichmentHandler(
+        project=self.project,
+        row_restriction_template="id = {}",
+        table_name=self.table_name,
+        fields=['id'],
+        throw_execption_on_empty_results=True,

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The parameter `throw_execption_on_empty_results` has a typo. It should be 
`throw_exception_on_empty_results`. This typo is repeated in the handler 
initializations in other tests within this file (lines 389, 416, 438, 454, 
474). Please correct it in all occurrences for consistency with the main code.
   
   ```suggestion
           throw_exception_on_empty_results=True,
   ```



##########
sdks/python/apache_beam/transforms/enrichment_handlers/bigquery.py:
##########
@@ -157,10 +162,13 @@ def __enter__(self):
   def _execute_query(self, query: str):
     try:
       results = self.client.query(query=query).result()
+      row_list = [dict(row.items()) for row in results]
+      # return None if no rows found
+      if len(row_list) == 0: return None

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   This check for empty results can be simplified. Using `if not row_list:` is 
a more Pythonic way to check for an empty list. Additionally, the comment is 
redundant as the code is self-explanatory, and placing a statement on the same 
line as `if` is discouraged by PEP 8.
   
   ```suggestion
         if not row_list:
           return None
   ```



##########
sdks/python/apache_beam/transforms/enrichment_handlers/bigquery_it_test.py:
##########
@@ -355,6 +355,129 @@ def test_bigquery_enrichment_with_redis(self):
       assert_that(pcoll_cached, equal_to(expected_rows))
     BigQueryEnrichmentHandler.__call__ = actual
 
+  def test_bigquery_enrichment_no_results_throws_exception(self):
+    requests = [
+        beam.Row(id=999, name='X'),  # This ID does not exist
+    ]
+    handler = BigQueryEnrichmentHandler(
+        project=self.project,
+        row_restriction_template="id = {}",
+        table_name=self.table_name,
+        fields=['id'],
+        throw_execption_on_empty_results=True,
+    )
+
+    with self.assertRaisesRegex(ValueError, "no matching row found for query"):
+      with TestPipeline(is_integration_test=True) as test_pipeline:
+        _ = (test_pipeline | beam.Create(requests) | Enrichment(handler))
+
+  def test_bigquery_enrichment_no_results_graceful(self):
+    requests = [
+        beam.Row(id=999, name='X'),  # This ID does not exist
+        beam.Row(id=1000, name='Y'),  # This ID does not exist
+    ]
+    # When no results are found and not throwing, Enrichment yields original.
+    expected_rows = requests
+
+    handler = BigQueryEnrichmentHandler(
+        project=self.project,
+        row_restriction_template="id = {}",
+        table_name=self.table_name,
+        fields=['id'],
+        min_batch_size=1,
+        max_batch_size=100,
+        throw_execption_on_empty_results=False,
+    )
+
+    with TestPipeline(is_integration_test=True) as test_pipeline:
+      pcoll = (test_pipeline | beam.Create(requests) | Enrichment(handler))
+      assert_that(pcoll, equal_to(expected_rows))
+
+  def test_bigquery_enrichment_no_results_partial_graceful_batched_partial(
+      self):
+    requests = [
+        beam.Row(id=1, name='A'),  # This ID does not exist

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The comment `# This ID does not exist` is incorrect for `id=1`. According to 
the test data setup, a row with `id=1` exists and is expected to be enriched, 
as confirmed by the `expected_rows`. This misleading comment should be 
corrected to reflect the actual test case.
   
   ```suggestion
           beam.Row(id=1, name='A'),  # This ID exists
   ```



##########
sdks/python/apache_beam/transforms/enrichment_handlers/bigquery.py:
##########
@@ -87,6 +90,7 @@ def __init__(
       query_fn: Optional[QueryFn] = None,
       min_batch_size: int = 1,
       max_batch_size: int = 10000,
+      throw_execption_on_empty_results: bool = True,

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   There's a typo in the parameter name `throw_execption_on_empty_results`. It 
should be `throw_exception_on_empty_results`. This typo should be corrected 
here and in its subsequent uses within this class on lines 152, 222, and 247 to 
maintain a clean and correct public API.
   
   ```suggestion
         throw_exception_on_empty_results: bool = True,
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to