gemini-code-assist[bot] commented on code in PR #36791:
URL: https://github.com/apache/beam/pull/36791#discussion_r2519995712


##########
sdks/python/apache_beam/transforms/enrichment_handlers/bigquery.py:
##########
@@ -204,11 +212,22 @@ def __call__(self, request: Union[beam.Row, 
list[beam.Row]], *args, **kwargs):
       query = raw_query.format(*values)
 
       responses_dict = self._execute_query(query)
-      for response in responses_dict:
-        response_row = beam.Row(**response)
-        response_key = self.create_row_key(response_row)
-        if response_key in requests_map:
-          responses.append((requests_map[response_key], response_row))
+      if responses_dict is not None:
+        for response in responses_dict:
+          response_row = beam.Row(**response)
+          response_key = self.create_row_key(response_row)
+          if response_key in requests_map:
+            responses.append((requests_map[response_key], response_row))
+      if len(responses) < len(request):
+        if self.throw_exception_on_empty_results:
+          raise ValueError(f"no matching row found for query: {query}")
+        else:
+          _LOGGER.warning('no matching row found for query: %s', query)
+          # append empty rows for missing responses
+          found_req_keys = {self.create_row_key(resp[0]) for resp in responses}
+          for req_key, req in requests_map.items():
+            if req_key not in found_req_keys:
+              responses.append((req, beam.Row()))

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The logic for handling missing responses in batched mode can be simplified 
for better readability and a minor performance improvement. Currently, you 
iterate through the found responses to build a set of keys, and then iterate 
through all requests to find the missing ones.
   
   A cleaner approach would be to keep track of unmatched requests. You can 
start with a copy of all requests and remove them as you find matches. The 
remaining items in your copy will be the unmatched requests. This avoids 
re-calculating keys for matched requests and makes the code easier to follow.
   
   ```suggestion
         responses_dict = self._execute_query(query)
         unmatched_requests = requests_map.copy()
         if responses_dict:
           for response in responses_dict:
             response_row = beam.Row(**response)
             response_key = self.create_row_key(response_row)
             if response_key in unmatched_requests:
               req = unmatched_requests.pop(response_key)
               responses.append((req, response_row))
   
         if unmatched_requests:
           if self.throw_exception_on_empty_results:
             raise ValueError(f"no matching row found for query: {query}")
           else:
             _LOGGER.warning('no matching row found for query: %s', query)
             for req in unmatched_requests.values():
               responses.append((req, beam.Row()))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to