gemini-code-assist[bot] commented on code in PR #38581:
URL: https://github.com/apache/beam/pull/38581#discussion_r3282916770


##########
sdks/python/apache_beam/runners/worker/data_plane.py:
##########
@@ -478,9 +491,38 @@ def __init__(self, data_buffer_time_limit_ms=0):
 
   def close(self):
     # type: () -> None
-    self._to_send.put(self._WRITES_FINISHED, 0)
+    self._enqueue_to_send(self._WRITES_FINISHED)
+    if self._send_forwarder is not None:
+      self._send_forwarder.join()
     self._closed = True
 
+  def _start_send_forwarder(self):
+    # type: () -> None
+    forwarder = threading.Thread(
+        target=self._forward_pending_to_send,
+        name='forward_grpc_outputs')
+    forwarder.daemon = True
+    forwarder.start()
+    self._send_forwarder = forwarder
+
+  def _enqueue_to_send(self, elem):
+    # type: (DataOrTimers) -> None
+    self._pending_send.put(elem, self._get_element_size_bytes(elem))
+
+  def _forward_pending_to_send(self):
+    # type: () -> None
+    try:
+      while True:
+        elem = self._pending_send.get()
+        self._to_send.put(elem, self._get_element_size_bytes(elem))
+        if elem is self._WRITES_FINISHED:
+          return
+    except Exception as e:
+      if not self._closed:
+        _LOGGER.exception('Failed to forward outputs in the data plane.')
+        self._exception = e
+      raise

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The `_forward_pending_to_send` method re-calculates the size of each element 
using `self._get_element_size_bytes(elem)`. Since `_enqueue_to_send` already 
calculates this size to put it into the `_pending_send` queue, this is 
redundant work. For large elements, this could be a performance bottleneck. 
Consider passing a tuple of `(element, size)` through the `_pending_send` queue 
to avoid re-calculation.
   
   ```suggestion
     def _forward_pending_to_send(self):
       # type: () -> None
       try:
         while True:
           elem, size = self._pending_send.get()
           self._to_send.put(elem, size)
           if elem is self._WRITES_FINISHED:
             return
       except Exception as e:
         if not self._closed:
           _LOGGER.exception('Failed to forward outputs in the data plane.')
           self._exception = e
         raise
   ```



##########
sdks/python/apache_beam/runners/worker/data_plane.py:
##########
@@ -478,9 +491,38 @@ def __init__(self, data_buffer_time_limit_ms=0):
 
   def close(self):
     # type: () -> None
-    self._to_send.put(self._WRITES_FINISHED, 0)
+    self._enqueue_to_send(self._WRITES_FINISHED)
+    if self._send_forwarder is not None:
+      self._send_forwarder.join()
     self._closed = True

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The `close` method joins the `_send_forwarder` thread but does not check if 
that thread encountered an exception (stored in `self._exception`). If the 
forwarder thread fails, data might not be fully sent, and the caller should be 
notified. It is recommended to check and re-raise `self._exception` after the 
join.
   
   ```suggestion
     def close(self):
       # type: () -> None
       self._enqueue_to_send(self._WRITES_FINISHED)
       if self._send_forwarder is not None:
         self._send_forwarder.join()
       if self._exception:
         raise self._exception
       self._closed = True
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to