gemini-code-assist[bot] commented on code in PR #38572:
URL: https://github.com/apache/beam/pull/38572#discussion_r3278341432


##########
sdks/python/apache_beam/utils/subprocess_server.py:
##########
@@ -186,45 +186,52 @@ def __exit__(self, *unused_args):
     self.stop()
 
   def start(self):
-    try:
-      process, endpoint = self.start_process()
-      wait_secs = .1
-      channel_options = [
-          ("grpc.max_receive_message_length", -1),
-          ("grpc.max_send_message_length", -1),
-          # Default: 20000ms (20s), increased to 10 minutes for stability
-          ("grpc.keepalive_timeout_ms", 600_000),
-          # Default: 2, set to 0 to allow unlimited pings without data
-          ("grpc.http2.max_pings_without_data", 0),
-          # Default: False, set to True to allow keepalive pings when no calls
-          ("grpc.keepalive_permit_without_calls", True),
-          # Default: 2, set to 0 to allow unlimited ping strikes
-          ("grpc.http2.max_ping_strikes", 0),
-          # Default: 0 (disabled), enable socket reuse for better handling
-          ("grpc.so_reuseport", 1),
-      ]
-      self._grpc_channel = grpc.insecure_channel(
-          endpoint, options=channel_options)
-      channel_ready = grpc.channel_ready_future(self._grpc_channel)
-      while True:
-        if process is not None and process.poll() is not None:
-          _LOGGER.error("Started job service with %s", process.args)
-          raise RuntimeError(
-              'Service failed to start up with error %s' % process.poll())
-        try:
-          channel_ready.result(timeout=wait_secs)
-          break
-        except (grpc.FutureTimeoutError, grpc.RpcError):
-          wait_secs *= 1.2
-          logging.log(
-              logging.WARNING if wait_secs > 1 else logging.DEBUG,
-              'Waiting for grpc channel to be ready at %s.',
-              endpoint)
-      return self._stub_class(self._grpc_channel)
-    except:  # pylint: disable=bare-except
-      _LOGGER.exception("Error bringing up service")
-      self.stop()
-      raise
+    max_attempts = 3
+    for attempt in range(max_attempts):
+      try:
+        process, endpoint = self.start_process()
+        wait_secs = .1
+        channel_options = [
+            ("grpc.max_receive_message_length", -1),
+            ("grpc.max_send_message_length", -1),
+            # Default: 20000ms (20s), increased to 10 minutes for stability
+            ("grpc.keepalive_timeout_ms", 600_000),
+            # Default: 2, set to 0 to allow unlimited pings without data
+            ("grpc.http2.max_pings_without_data", 0),
+            # Default: False, set to True to allow keepalive pings when no 
calls
+            ("grpc.keepalive_permit_without_calls", True),
+            # Default: 2, set to 0 to allow unlimited ping strikes
+            ("grpc.http2.max_ping_strikes", 0),
+            # Default: 0 (disabled), enable socket reuse for better handling
+            ("grpc.so_reuseport", 1),
+        ]
+        self._grpc_channel = grpc.insecure_channel(
+            endpoint, options=channel_options)
+        channel_ready = grpc.channel_ready_future(self._grpc_channel)
+        while True:
+          if process is not None and process.poll() is not None:
+            _LOGGER.error("Started job service with %s", process.args)
+            raise RuntimeError(
+                'Service failed to start up with error %s' % process.poll())
+          try:
+            channel_ready.result(timeout=wait_secs)
+            break
+          except (grpc.FutureTimeoutError, grpc.RpcError):
+            wait_secs *= 1.2
+            logging.log(
+                logging.WARNING if wait_secs > 1 else logging.DEBUG,
+                'Waiting for grpc channel to be ready at %s.',
+                endpoint)
+        return self._stub_class(self._grpc_channel)
+      except Exception as e:
+        _LOGGER.warning(
+            "Error bringing up service on attempt %d: %s",
+            attempt + 1,
+            e,
+            exc_info=True)
+        self.stop()
+        if attempt == max_attempts - 1:
+          raise

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The retry loop for starting the subprocess service lacks a delay between 
attempts. If the failure is due to a transient condition like a port collision 
(e.g., where the port is still in a `TIME_WAIT` state from a previous run), 
retrying immediately 3 times will likely fail all attempts within milliseconds. 
Adding a small delay (e.g., 1 second) between retries would significantly 
improve the effectiveness of this retry mechanism.
   
   ```suggestion
         except Exception as e:
           _LOGGER.warning(
               "Error bringing up service on attempt %d: %s",
               attempt + 1,
               e,
               exc_info=True)
           self.stop()
           if attempt == max_attempts - 1:
             raise
           import time
           time.sleep(1)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to