gemini-code-assist[bot] commented on code in PR #38572:
URL: https://github.com/apache/beam/pull/38572#discussion_r3278341432
##########
sdks/python/apache_beam/utils/subprocess_server.py:
##########
@@ -186,45 +186,52 @@ def __exit__(self, *unused_args):
self.stop()
def start(self):
- try:
- process, endpoint = self.start_process()
- wait_secs = .1
- channel_options = [
- ("grpc.max_receive_message_length", -1),
- ("grpc.max_send_message_length", -1),
- # Default: 20000ms (20s), increased to 10 minutes for stability
- ("grpc.keepalive_timeout_ms", 600_000),
- # Default: 2, set to 0 to allow unlimited pings without data
- ("grpc.http2.max_pings_without_data", 0),
- # Default: False, set to True to allow keepalive pings when no calls
- ("grpc.keepalive_permit_without_calls", True),
- # Default: 2, set to 0 to allow unlimited ping strikes
- ("grpc.http2.max_ping_strikes", 0),
- # Default: 0 (disabled), enable socket reuse for better handling
- ("grpc.so_reuseport", 1),
- ]
- self._grpc_channel = grpc.insecure_channel(
- endpoint, options=channel_options)
- channel_ready = grpc.channel_ready_future(self._grpc_channel)
- while True:
- if process is not None and process.poll() is not None:
- _LOGGER.error("Started job service with %s", process.args)
- raise RuntimeError(
- 'Service failed to start up with error %s' % process.poll())
- try:
- channel_ready.result(timeout=wait_secs)
- break
- except (grpc.FutureTimeoutError, grpc.RpcError):
- wait_secs *= 1.2
- logging.log(
- logging.WARNING if wait_secs > 1 else logging.DEBUG,
- 'Waiting for grpc channel to be ready at %s.',
- endpoint)
- return self._stub_class(self._grpc_channel)
- except: # pylint: disable=bare-except
- _LOGGER.exception("Error bringing up service")
- self.stop()
- raise
+ max_attempts = 3
+ for attempt in range(max_attempts):
+ try:
+ process, endpoint = self.start_process()
+ wait_secs = .1
+ channel_options = [
+ ("grpc.max_receive_message_length", -1),
+ ("grpc.max_send_message_length", -1),
+ # Default: 20000ms (20s), increased to 10 minutes for stability
+ ("grpc.keepalive_timeout_ms", 600_000),
+ # Default: 2, set to 0 to allow unlimited pings without data
+ ("grpc.http2.max_pings_without_data", 0),
+ # Default: False, set to True to allow keepalive pings when no
calls
+ ("grpc.keepalive_permit_without_calls", True),
+ # Default: 2, set to 0 to allow unlimited ping strikes
+ ("grpc.http2.max_ping_strikes", 0),
+ # Default: 0 (disabled), enable socket reuse for better handling
+ ("grpc.so_reuseport", 1),
+ ]
+ self._grpc_channel = grpc.insecure_channel(
+ endpoint, options=channel_options)
+ channel_ready = grpc.channel_ready_future(self._grpc_channel)
+ while True:
+ if process is not None and process.poll() is not None:
+ _LOGGER.error("Started job service with %s", process.args)
+ raise RuntimeError(
+ 'Service failed to start up with error %s' % process.poll())
+ try:
+ channel_ready.result(timeout=wait_secs)
+ break
+ except (grpc.FutureTimeoutError, grpc.RpcError):
+ wait_secs *= 1.2
+ logging.log(
+ logging.WARNING if wait_secs > 1 else logging.DEBUG,
+ 'Waiting for grpc channel to be ready at %s.',
+ endpoint)
+ return self._stub_class(self._grpc_channel)
+ except Exception as e:
+ _LOGGER.warning(
+ "Error bringing up service on attempt %d: %s",
+ attempt + 1,
+ e,
+ exc_info=True)
+ self.stop()
+ if attempt == max_attempts - 1:
+ raise
Review Comment:

The retry loop for starting the subprocess service lacks a delay between
attempts. If the failure is due to a transient condition like a port collision
(e.g., where the port is still in a `TIME_WAIT` state from a previous run),
retrying immediately 3 times will likely fail all attempts within milliseconds.
Adding a small delay (e.g., 1 second) between retries would significantly
improve the effectiveness of this retry mechanism.
```suggestion
except Exception as e:
_LOGGER.warning(
"Error bringing up service on attempt %d: %s",
attempt + 1,
e,
exc_info=True)
self.stop()
if attempt == max_attempts - 1:
raise
import time
time.sleep(1)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]