gemini-code-assist[bot] commented on code in PR #38611:
URL: https://github.com/apache/beam/pull/38611#discussion_r3293716282
##########
sdks/python/apache_beam/runners/portability/expansion_service_main.py:
##########
@@ -73,15 +71,9 @@ def main(argv):
artifact_service.ArtifactRetrievalService(
artifact_service.BeamFilesystemHandler(None).file_reader),
server)
- # Ensure gRPC server successfully binds. If this fails (e.g., due to port
collision),
- # add_insecure_port returns 0. We raise an error to crash the subprocess
immediately,
- # allowing the parent process to detect it and fail fast rather than
hanging.
- bound_port = server.add_insecure_port(address)
- if not bound_port:
- raise RuntimeError(
- "Failed to bind expansion service to {}".format(address))
+ server.add_insecure_port(address)
Review Comment:

It is important to verify that the gRPC server successfully binds to the
requested port. If `add_insecure_port` fails (e.g., due to a port collision),
it returns 0. Without this check, the service may appear to start but will not
be reachable, making debugging difficult. Additionally, capturing the return
value is necessary to correctly log the port when using ephemeral ports (port
0).
```suggestion
bound_port = server.add_insecure_port(address)
if not bound_port:
raise RuntimeError(
"Failed to bind expansion service to {}".format(address))
```
##########
sdks/python/apache_beam/runners/portability/expansion_service_main.py:
##########
@@ -73,15 +71,9 @@ def main(argv):
artifact_service.ArtifactRetrievalService(
artifact_service.BeamFilesystemHandler(None).file_reader),
server)
- # Ensure gRPC server successfully binds. If this fails (e.g., due to port
collision),
- # add_insecure_port returns 0. We raise an error to crash the subprocess
immediately,
- # allowing the parent process to detect it and fail fast rather than
hanging.
- bound_port = server.add_insecure_port(address)
- if not bound_port:
- raise RuntimeError(
- "Failed to bind expansion service to {}".format(address))
+ server.add_insecure_port(address)
server.start()
- _LOGGER.info('Listening for expansion requests at %d', bound_port)
+ _LOGGER.info('Listening for expansion requests at %d', known_args.port)
Review Comment:

Please use the actual bound port for logging. If `known_args.port` is 0,
logging it will show 0 instead of the allocated ephemeral port. This depends on
capturing `bound_port` from the `add_insecure_port` call.
```suggestion
_LOGGER.info('Listening for expansion requests at %d', bound_port)
```
##########
sdks/python/apache_beam/utils/subprocess_server.py:
##########
@@ -282,6 +280,20 @@ def stop_process(self):
finally:
self._grpc_channel = None
+ def stop_force(self):
+ try:
+ self._cache.force_remove(tuple(self._cmd), self._port, self._logger)
+ finally:
+ self._owner_id = None
+ if self._grpc_channel:
+ try:
+ self._grpc_channel.close()
+ except: # pylint: disable=bare-except
Review Comment:

Consider catching `Exception` instead of using a bare `except:`. This avoids
catching and logging system-level interrupts like `KeyboardInterrupt` as errors
during the channel closing process.
```suggestion
except Exception:
```
##########
sdks/python/apache_beam/utils/subprocess_server.py:
##########
@@ -186,53 +192,45 @@ def __exit__(self, *unused_args):
self.stop()
def start(self):
- max_attempts = 3
- for attempt in range(max_attempts):
- try:
- process, endpoint = self.start_process()
- wait_secs = .1
- channel_options = [
- ("grpc.max_receive_message_length", -1),
- ("grpc.max_send_message_length", -1),
- # Default: 20000ms (20s), increased to 10 minutes for stability
- ("grpc.keepalive_timeout_ms", 600_000),
- # Default: 2, set to 0 to allow unlimited pings without data
- ("grpc.http2.max_pings_without_data", 0),
- # Default: False, set to True to allow keepalive pings when no
calls
- ("grpc.keepalive_permit_without_calls", True),
- # Default: 2, set to 0 to allow unlimited ping strikes
- ("grpc.http2.max_ping_strikes", 0),
- # Default: 0 (disabled), enable socket reuse for better handling
- ("grpc.so_reuseport", 1),
- ]
- self._grpc_channel = grpc.insecure_channel(
- endpoint, options=channel_options)
- channel_ready = grpc.channel_ready_future(self._grpc_channel)
- while True:
- if process is not None and process.poll() is not None:
- _LOGGER.error("Started job service with %s", process.args)
- raise RuntimeError(
- 'Service failed to start up with error %s' % process.poll())
- try:
- channel_ready.result(timeout=wait_secs)
- break
- except (grpc.FutureTimeoutError, grpc.RpcError):
- wait_secs *= 1.2
- logging.log(
- logging.WARNING if wait_secs > 1 else logging.DEBUG,
- 'Waiting for grpc channel to be ready at %s.',
- endpoint)
- return self._stub_class(self._grpc_channel)
- except Exception as e:
- _LOGGER.warning(
- "Error bringing up service on attempt %d: %s",
- attempt + 1,
- e,
- exc_info=True)
- self.stop()
- if attempt == max_attempts - 1:
- raise
- time.sleep(1)
+ try:
+ process, endpoint = self.start_process()
+ wait_secs = .1
+ channel_options = [
+ ("grpc.max_receive_message_length", -1),
+ ("grpc.max_send_message_length", -1),
+ # Default: 20000ms (20s), increased to 10 minutes for stability
+ ("grpc.keepalive_timeout_ms", 600_000),
+ # Default: 2, set to 0 to allow unlimited pings without data
+ ("grpc.http2.max_pings_without_data", 0),
+ # Default: False, set to True to allow keepalive pings when no calls
+ ("grpc.keepalive_permit_without_calls", True),
+ # Default: 2, set to 0 to allow unlimited ping strikes
+ ("grpc.http2.max_ping_strikes", 0),
+ # Default: 0 (disabled), enable socket reuse for better handling
+ ("grpc.so_reuseport", 1),
+ ]
+ self._grpc_channel = grpc.insecure_channel(
+ endpoint, options=channel_options)
+ channel_ready = grpc.channel_ready_future(self._grpc_channel)
+ while True:
+ if process is not None and process.poll() is not None:
+ _LOGGER.error("Started job service with %s", process.args)
+ raise RuntimeError(
+ 'Service failed to start up with error %s' % process.poll())
+ try:
+ channel_ready.result(timeout=wait_secs)
+ break
+ except (grpc.FutureTimeoutError, grpc.RpcError):
+ wait_secs *= 1.2
+ logging.log(
+ logging.WARNING if wait_secs > 1 else logging.DEBUG,
+ 'Waiting for grpc channel to be ready at %s.',
+ endpoint)
+ return self._stub_class(self._grpc_channel)
+ except: # pylint: disable=bare-except
+ _LOGGER.exception("Error bringing up service")
+ self.stop_force()
+ raise
Review Comment:

Using a bare `except:` block catches all exceptions, including
`KeyboardInterrupt` and `SystemExit`. Logging these as a "service error" with a
stack trace is usually undesirable. It is better to catch `Exception` for
logging purposes, while still ensuring `stop_force()` is called for all base
exceptions to prevent subprocess leaks.
```suggestion
except Exception:
_LOGGER.exception("Error bringing up service")
self.stop_force()
raise
except: # pylint: disable=bare-except
self.stop_force()
raise
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]