gemini-code-assist[bot] commented on code in PR #38611:
URL: https://github.com/apache/beam/pull/38611#discussion_r3293716282


##########
sdks/python/apache_beam/runners/portability/expansion_service_main.py:
##########
@@ -73,15 +71,9 @@ def main(argv):
         artifact_service.ArtifactRetrievalService(
             artifact_service.BeamFilesystemHandler(None).file_reader),
         server)
-    # Ensure gRPC server successfully binds. If this fails (e.g., due to port 
collision),
-    # add_insecure_port returns 0. We raise an error to crash the subprocess 
immediately,
-    # allowing the parent process to detect it and fail fast rather than 
hanging.
-    bound_port = server.add_insecure_port(address)
-    if not bound_port:
-      raise RuntimeError(
-          "Failed to bind expansion service to {}".format(address))
+    server.add_insecure_port(address)

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   It is important to verify that the gRPC server successfully binds to the 
requested port. If `add_insecure_port` fails (e.g., due to a port collision), 
it returns 0. Without this check, the service may appear to start but will not 
be reachable, making debugging difficult. Additionally, capturing the return 
value is necessary to correctly log the port when using ephemeral ports (port 
0).
   
   ```suggestion
       bound_port = server.add_insecure_port(address)
       if not bound_port:
         raise RuntimeError(
             "Failed to bind expansion service to {}".format(address))
   ```



##########
sdks/python/apache_beam/runners/portability/expansion_service_main.py:
##########
@@ -73,15 +71,9 @@ def main(argv):
         artifact_service.ArtifactRetrievalService(
             artifact_service.BeamFilesystemHandler(None).file_reader),
         server)
-    # Ensure gRPC server successfully binds. If this fails (e.g., due to port 
collision),
-    # add_insecure_port returns 0. We raise an error to crash the subprocess 
immediately,
-    # allowing the parent process to detect it and fail fast rather than 
hanging.
-    bound_port = server.add_insecure_port(address)
-    if not bound_port:
-      raise RuntimeError(
-          "Failed to bind expansion service to {}".format(address))
+    server.add_insecure_port(address)
     server.start()
-    _LOGGER.info('Listening for expansion requests at %d', bound_port)
+    _LOGGER.info('Listening for expansion requests at %d', known_args.port)

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   Please use the actual bound port for logging. If `known_args.port` is 0, 
logging it will show 0 instead of the allocated ephemeral port. This depends on 
capturing `bound_port` from the `add_insecure_port` call.
   
   ```suggestion
       _LOGGER.info('Listening for expansion requests at %d', bound_port)
   ```



##########
sdks/python/apache_beam/utils/subprocess_server.py:
##########
@@ -282,6 +280,20 @@ def stop_process(self):
       finally:
         self._grpc_channel = None
 
+  def stop_force(self):
+    try:
+      self._cache.force_remove(tuple(self._cmd), self._port, self._logger)
+    finally:
+      self._owner_id = None
+    if self._grpc_channel:
+      try:
+        self._grpc_channel.close()
+      except:  # pylint: disable=bare-except

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Consider catching `Exception` instead of using a bare `except:`. This avoids 
catching and logging system-level interrupts like `KeyboardInterrupt` as errors 
during the channel closing process.
   
   ```suggestion
         except Exception:
   ```



##########
sdks/python/apache_beam/utils/subprocess_server.py:
##########
@@ -186,53 +192,45 @@ def __exit__(self, *unused_args):
     self.stop()
 
   def start(self):
-    max_attempts = 3
-    for attempt in range(max_attempts):
-      try:
-        process, endpoint = self.start_process()
-        wait_secs = .1
-        channel_options = [
-            ("grpc.max_receive_message_length", -1),
-            ("grpc.max_send_message_length", -1),
-            # Default: 20000ms (20s), increased to 10 minutes for stability
-            ("grpc.keepalive_timeout_ms", 600_000),
-            # Default: 2, set to 0 to allow unlimited pings without data
-            ("grpc.http2.max_pings_without_data", 0),
-            # Default: False, set to True to allow keepalive pings when no 
calls
-            ("grpc.keepalive_permit_without_calls", True),
-            # Default: 2, set to 0 to allow unlimited ping strikes
-            ("grpc.http2.max_ping_strikes", 0),
-            # Default: 0 (disabled), enable socket reuse for better handling
-            ("grpc.so_reuseport", 1),
-        ]
-        self._grpc_channel = grpc.insecure_channel(
-            endpoint, options=channel_options)
-        channel_ready = grpc.channel_ready_future(self._grpc_channel)
-        while True:
-          if process is not None and process.poll() is not None:
-            _LOGGER.error("Started job service with %s", process.args)
-            raise RuntimeError(
-                'Service failed to start up with error %s' % process.poll())
-          try:
-            channel_ready.result(timeout=wait_secs)
-            break
-          except (grpc.FutureTimeoutError, grpc.RpcError):
-            wait_secs *= 1.2
-            logging.log(
-                logging.WARNING if wait_secs > 1 else logging.DEBUG,
-                'Waiting for grpc channel to be ready at %s.',
-                endpoint)
-        return self._stub_class(self._grpc_channel)
-      except Exception as e:
-        _LOGGER.warning(
-            "Error bringing up service on attempt %d: %s",
-            attempt + 1,
-            e,
-            exc_info=True)
-        self.stop()
-        if attempt == max_attempts - 1:
-          raise
-        time.sleep(1)
+    try:
+      process, endpoint = self.start_process()
+      wait_secs = .1
+      channel_options = [
+          ("grpc.max_receive_message_length", -1),
+          ("grpc.max_send_message_length", -1),
+          # Default: 20000ms (20s), increased to 10 minutes for stability
+          ("grpc.keepalive_timeout_ms", 600_000),
+          # Default: 2, set to 0 to allow unlimited pings without data
+          ("grpc.http2.max_pings_without_data", 0),
+          # Default: False, set to True to allow keepalive pings when no calls
+          ("grpc.keepalive_permit_without_calls", True),
+          # Default: 2, set to 0 to allow unlimited ping strikes
+          ("grpc.http2.max_ping_strikes", 0),
+          # Default: 0 (disabled), enable socket reuse for better handling
+          ("grpc.so_reuseport", 1),
+      ]
+      self._grpc_channel = grpc.insecure_channel(
+          endpoint, options=channel_options)
+      channel_ready = grpc.channel_ready_future(self._grpc_channel)
+      while True:
+        if process is not None and process.poll() is not None:
+          _LOGGER.error("Started job service with %s", process.args)
+          raise RuntimeError(
+              'Service failed to start up with error %s' % process.poll())
+        try:
+          channel_ready.result(timeout=wait_secs)
+          break
+        except (grpc.FutureTimeoutError, grpc.RpcError):
+          wait_secs *= 1.2
+          logging.log(
+              logging.WARNING if wait_secs > 1 else logging.DEBUG,
+              'Waiting for grpc channel to be ready at %s.',
+              endpoint)
+      return self._stub_class(self._grpc_channel)
+    except:  # pylint: disable=bare-except
+      _LOGGER.exception("Error bringing up service")
+      self.stop_force()
+      raise

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Using a bare `except:` block catches all exceptions, including 
`KeyboardInterrupt` and `SystemExit`. Logging these as a "service error" with a 
stack trace is usually undesirable. It is better to catch `Exception` for 
logging purposes, while still ensuring `stop_force()` is called for all base 
exceptions to prevent subprocess leaks.
   
   ```suggestion
       except Exception:
         _LOGGER.exception("Error bringing up service")
         self.stop_force()
         raise
       except:  # pylint: disable=bare-except
         self.stop_force()
         raise
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to