This is an automated email from the ASF dual-hosted git repository.
shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 930b94cceb6 Fix test hang in subprocess expansion service on port bind
failure (#38572)
930b94cceb6 is described below
commit 930b94cceb69e33bd9a9a2f1287ebe5c75533536
Author: Shunping Huang <[email protected]>
AuthorDate: Thu May 21 09:11:01 2026 -0400
Fix test hang in subprocess expansion service on port bind failure (#38572)
* Fix silent test hang in subprocess expansion service on port bind failure
* Formatting
* Add retry when starting subprocess server.
* Add sleep before retrying.
---
.../runners/portability/expansion_service_main.py | 14 +++-
sdks/python/apache_beam/utils/subprocess_server.py | 86 ++++++++++++----------
2 files changed, 58 insertions(+), 42 deletions(-)
diff --git
a/sdks/python/apache_beam/runners/portability/expansion_service_main.py
b/sdks/python/apache_beam/runners/portability/expansion_service_main.py
index 269d02b3efb..f2d03e0e898 100644
--- a/sdks/python/apache_beam/runners/portability/expansion_service_main.py
+++ b/sdks/python/apache_beam/runners/portability/expansion_service_main.py
@@ -55,7 +55,9 @@ def main(argv):
with
fully_qualified_named_transform.FullyQualifiedNamedTransform.with_filter(
known_args.fully_qualified_name_glob):
- address = '0.0.0.0:{}'.format(known_args.port)
+ # Bind to localhost instead of 0.0.0.0 to ensure compatibility with
loopback
+ # connections on dual-stack (IPv4/IPv6) systems.
+ address = 'localhost:{}'.format(known_args.port)
server = grpc.server(thread_pool_executor.shared_unbounded_instance())
if known_args.serve_loopback_worker:
beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server(
@@ -71,9 +73,15 @@ def main(argv):
artifact_service.ArtifactRetrievalService(
artifact_service.BeamFilesystemHandler(None).file_reader),
server)
- server.add_insecure_port(address)
+ # Ensure gRPC server successfully binds. If this fails (e.g., due to port
collision),
+ # add_insecure_port returns 0. We raise an error to crash the subprocess
immediately,
+ # allowing the parent process to detect it and fail fast rather than
hanging.
+ bound_port = server.add_insecure_port(address)
+ if not bound_port:
+ raise RuntimeError(
+ "Failed to bind expansion service to {}".format(address))
server.start()
- _LOGGER.info('Listening for expansion requests at %d', known_args.port)
+ _LOGGER.info('Listening for expansion requests at %d', bound_port)
def cleanup(unused_signum, unused_frame):
_LOGGER.info('Shutting down expansion service.')
diff --git a/sdks/python/apache_beam/utils/subprocess_server.py
b/sdks/python/apache_beam/utils/subprocess_server.py
index d21cb486b8f..b22e6badb5e 100644
--- a/sdks/python/apache_beam/utils/subprocess_server.py
+++ b/sdks/python/apache_beam/utils/subprocess_server.py
@@ -186,45 +186,53 @@ class SubprocessServer(object):
self.stop()
def start(self):
- try:
- process, endpoint = self.start_process()
- wait_secs = .1
- channel_options = [
- ("grpc.max_receive_message_length", -1),
- ("grpc.max_send_message_length", -1),
- # Default: 20000ms (20s), increased to 10 minutes for stability
- ("grpc.keepalive_timeout_ms", 600_000),
- # Default: 2, set to 0 to allow unlimited pings without data
- ("grpc.http2.max_pings_without_data", 0),
- # Default: False, set to True to allow keepalive pings when no calls
- ("grpc.keepalive_permit_without_calls", True),
- # Default: 2, set to 0 to allow unlimited ping strikes
- ("grpc.http2.max_ping_strikes", 0),
- # Default: 0 (disabled), enable socket reuse for better handling
- ("grpc.so_reuseport", 1),
- ]
- self._grpc_channel = grpc.insecure_channel(
- endpoint, options=channel_options)
- channel_ready = grpc.channel_ready_future(self._grpc_channel)
- while True:
- if process is not None and process.poll() is not None:
- _LOGGER.error("Started job service with %s", process.args)
- raise RuntimeError(
- 'Service failed to start up with error %s' % process.poll())
- try:
- channel_ready.result(timeout=wait_secs)
- break
- except (grpc.FutureTimeoutError, grpc.RpcError):
- wait_secs *= 1.2
- logging.log(
- logging.WARNING if wait_secs > 1 else logging.DEBUG,
- 'Waiting for grpc channel to be ready at %s.',
- endpoint)
- return self._stub_class(self._grpc_channel)
- except: # pylint: disable=bare-except
- _LOGGER.exception("Error bringing up service")
- self.stop()
- raise
+ max_attempts = 3
+ for attempt in range(max_attempts):
+ try:
+ process, endpoint = self.start_process()
+ wait_secs = .1
+ channel_options = [
+ ("grpc.max_receive_message_length", -1),
+ ("grpc.max_send_message_length", -1),
+ # Default: 20000ms (20s), increased to 10 minutes for stability
+ ("grpc.keepalive_timeout_ms", 600_000),
+ # Default: 2, set to 0 to allow unlimited pings without data
+ ("grpc.http2.max_pings_without_data", 0),
+ # Default: False, set to True to allow keepalive pings when no
calls
+ ("grpc.keepalive_permit_without_calls", True),
+ # Default: 2, set to 0 to allow unlimited ping strikes
+ ("grpc.http2.max_ping_strikes", 0),
+ # Default: 0 (disabled), enable socket reuse for better handling
+ ("grpc.so_reuseport", 1),
+ ]
+ self._grpc_channel = grpc.insecure_channel(
+ endpoint, options=channel_options)
+ channel_ready = grpc.channel_ready_future(self._grpc_channel)
+ while True:
+ if process is not None and process.poll() is not None:
+ _LOGGER.error("Started job service with %s", process.args)
+ raise RuntimeError(
+ 'Service failed to start up with error %s' % process.poll())
+ try:
+ channel_ready.result(timeout=wait_secs)
+ break
+ except (grpc.FutureTimeoutError, grpc.RpcError):
+ wait_secs *= 1.2
+ logging.log(
+ logging.WARNING if wait_secs > 1 else logging.DEBUG,
+ 'Waiting for grpc channel to be ready at %s.',
+ endpoint)
+ return self._stub_class(self._grpc_channel)
+ except Exception as e:
+ _LOGGER.warning(
+ "Error bringing up service on attempt %d: %s",
+ attempt + 1,
+ e,
+ exc_info=True)
+ self.stop()
+ if attempt == max_attempts - 1:
+ raise
+ time.sleep(1)
def start_process(self):
if self._owner_id is not None: