This is an automated email from the ASF dual-hosted git repository.
shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 361c2c473a8 Reduce python expansion service startup time (#38611)
361c2c473a8 is described below
commit 361c2c473a8eb6ebddfc3211f78553b521fdfa54
Author: Shunping Huang <[email protected]>
AuthorDate: Sun May 24 08:12:52 2026 -0400
Reduce python expansion service startup time (#38611)
* Revert "Fix test hang in subprocess expansion service on port bind
failure (#38572)"
This reverts commit 930b94cceb69e33bd9a9a2f1287ebe5c75533536.
* Ensure immediate cleanup of subprocess server on start failure
When a SubprocessServer fails to start (e.g., due to a process exit or
startup error), the server process could leak if standard purging
is blocked by other active owners sharing the cached subprocess.
To fix this:
- Implement `_SharedCache.force_remove()` to immediately remove a key
from the cache and run its destructor regardless of active owners.
- Add `SubprocessServer.stop_force()` which calls `force_remove()` to
completely terminate the server's process.
- Call `stop_force()` in the `except` block of `SubprocessServer.start()`
* Support modern manylinux tags based on pip version in Stager
This ensures we can download pre-built wheels for environment staging
rather than relying on tarball building, which is sometimes slow.
* Formatting
* Trigger more python tests.
* Typo
---
.../beam_PostCommit_Python_Versions.json | 2 +-
.../runners/portability/expansion_service_main.py | 14 +--
.../apache_beam/runners/portability/stager.py | 8 +-
sdks/python/apache_beam/utils/subprocess_server.py | 106 ++++++++++++---------
.../apache_beam/utils/subprocess_server_test.py | 87 +++++++++++++++++
5 files changed, 155 insertions(+), 62 deletions(-)
diff --git a/.github/trigger_files/beam_PostCommit_Python_Versions.json
b/.github/trigger_files/beam_PostCommit_Python_Versions.json
index 541dc4ea8e8..8ed972c9f57 100644
--- a/.github/trigger_files/beam_PostCommit_Python_Versions.json
+++ b/.github/trigger_files/beam_PostCommit_Python_Versions.json
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to
run",
- "revision": 2
+ "revision": 3
}
diff --git
a/sdks/python/apache_beam/runners/portability/expansion_service_main.py
b/sdks/python/apache_beam/runners/portability/expansion_service_main.py
index f2d03e0e898..269d02b3efb 100644
--- a/sdks/python/apache_beam/runners/portability/expansion_service_main.py
+++ b/sdks/python/apache_beam/runners/portability/expansion_service_main.py
@@ -55,9 +55,7 @@ def main(argv):
with
fully_qualified_named_transform.FullyQualifiedNamedTransform.with_filter(
known_args.fully_qualified_name_glob):
- # Bind to localhost instead of 0.0.0.0 to ensure compatibility with
loopback
- # connections on dual-stack (IPv4/IPv6) systems.
- address = 'localhost:{}'.format(known_args.port)
+ address = '0.0.0.0:{}'.format(known_args.port)
server = grpc.server(thread_pool_executor.shared_unbounded_instance())
if known_args.serve_loopback_worker:
beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server(
@@ -73,15 +71,9 @@ def main(argv):
artifact_service.ArtifactRetrievalService(
artifact_service.BeamFilesystemHandler(None).file_reader),
server)
- # Ensure gRPC server successfully binds. If this fails (e.g., due to port
collision),
- # add_insecure_port returns 0. We raise an error to crash the subprocess
immediately,
- # allowing the parent process to detect it and fail fast rather than
hanging.
- bound_port = server.add_insecure_port(address)
- if not bound_port:
- raise RuntimeError(
- "Failed to bind expansion service to {}".format(address))
+ server.add_insecure_port(address)
server.start()
- _LOGGER.info('Listening for expansion requests at %d', bound_port)
+ _LOGGER.info('Listening for expansion requests at %d', known_args.port)
def cleanup(unused_signum, unused_frame):
_LOGGER.info('Shutting down expansion service.')
diff --git a/sdks/python/apache_beam/runners/portability/stager.py
b/sdks/python/apache_beam/runners/portability/stager.py
index e862fde4efe..136c320da00 100644
--- a/sdks/python/apache_beam/runners/portability/stager.py
+++ b/sdks/python/apache_beam/runners/portability/stager.py
@@ -732,9 +732,11 @@ class Stager(object):
# addressed, download wheel based on glibc version in Beam's Python
# Base image
pip_version = distribution('pip').version
- if version.parse(pip_version) >= version.parse('19.3'):
- # pip can only recognize manylinux2014_x86_64 wheels
- # from version 19.3.
+ # See more information about manylinux at
+ # https://github.com/pypa/manylinux
+ if version.parse(pip_version) >= version.parse('20.3'):
+ return 'manylinux_2_28_x86_64'
+ elif version.parse(pip_version) >= version.parse('19.3'):
return 'manylinux2014_x86_64'
else:
return 'manylinux2010_x86_64'
diff --git a/sdks/python/apache_beam/utils/subprocess_server.py
b/sdks/python/apache_beam/utils/subprocess_server.py
index b22e6badb5e..0b09b364362 100644
--- a/sdks/python/apache_beam/utils/subprocess_server.py
+++ b/sdks/python/apache_beam/utils/subprocess_server.py
@@ -118,6 +118,12 @@ class _SharedCache:
self._cache[key].owners.add(owner)
return self._cache[key].obj
+ def force_remove(self, *key):
+ with self._lock:
+ entry = self._cache.pop(key, None)
+ if entry is not None:
+ self._destructor(entry.obj)
+
class JavaHelper:
@classmethod
@@ -186,53 +192,45 @@ class SubprocessServer(object):
self.stop()
def start(self):
- max_attempts = 3
- for attempt in range(max_attempts):
- try:
- process, endpoint = self.start_process()
- wait_secs = .1
- channel_options = [
- ("grpc.max_receive_message_length", -1),
- ("grpc.max_send_message_length", -1),
- # Default: 20000ms (20s), increased to 10 minutes for stability
- ("grpc.keepalive_timeout_ms", 600_000),
- # Default: 2, set to 0 to allow unlimited pings without data
- ("grpc.http2.max_pings_without_data", 0),
- # Default: False, set to True to allow keepalive pings when no
calls
- ("grpc.keepalive_permit_without_calls", True),
- # Default: 2, set to 0 to allow unlimited ping strikes
- ("grpc.http2.max_ping_strikes", 0),
- # Default: 0 (disabled), enable socket reuse for better handling
- ("grpc.so_reuseport", 1),
- ]
- self._grpc_channel = grpc.insecure_channel(
- endpoint, options=channel_options)
- channel_ready = grpc.channel_ready_future(self._grpc_channel)
- while True:
- if process is not None and process.poll() is not None:
- _LOGGER.error("Started job service with %s", process.args)
- raise RuntimeError(
- 'Service failed to start up with error %s' % process.poll())
- try:
- channel_ready.result(timeout=wait_secs)
- break
- except (grpc.FutureTimeoutError, grpc.RpcError):
- wait_secs *= 1.2
- logging.log(
- logging.WARNING if wait_secs > 1 else logging.DEBUG,
- 'Waiting for grpc channel to be ready at %s.',
- endpoint)
- return self._stub_class(self._grpc_channel)
- except Exception as e:
- _LOGGER.warning(
- "Error bringing up service on attempt %d: %s",
- attempt + 1,
- e,
- exc_info=True)
- self.stop()
- if attempt == max_attempts - 1:
- raise
- time.sleep(1)
+ try:
+ process, endpoint = self.start_process()
+ wait_secs = .1
+ channel_options = [
+ ("grpc.max_receive_message_length", -1),
+ ("grpc.max_send_message_length", -1),
+ # Default: 20000ms (20s), increased to 10 minutes for stability
+ ("grpc.keepalive_timeout_ms", 600_000),
+ # Default: 2, set to 0 to allow unlimited pings without data
+ ("grpc.http2.max_pings_without_data", 0),
+ # Default: False, set to True to allow keepalive pings when no calls
+ ("grpc.keepalive_permit_without_calls", True),
+ # Default: 2, set to 0 to allow unlimited ping strikes
+ ("grpc.http2.max_ping_strikes", 0),
+ # Default: 0 (disabled), enable socket reuse for better handling
+ ("grpc.so_reuseport", 1),
+ ]
+ self._grpc_channel = grpc.insecure_channel(
+ endpoint, options=channel_options)
+ channel_ready = grpc.channel_ready_future(self._grpc_channel)
+ while True:
+ if process is not None and process.poll() is not None:
+ _LOGGER.error("Started job service with %s", process.args)
+ raise RuntimeError(
+ 'Service failed to start up with error %s' % process.poll())
+ try:
+ channel_ready.result(timeout=wait_secs)
+ break
+ except (grpc.FutureTimeoutError, grpc.RpcError):
+ wait_secs *= 1.2
+ logging.log(
+ logging.WARNING if wait_secs > 1 else logging.DEBUG,
+ 'Waiting for grpc channel to be ready at %s.',
+ endpoint)
+ return self._stub_class(self._grpc_channel)
+ except: # pylint: disable=bare-except
+ _LOGGER.exception("Error bringing up service")
+ self.stop_force()
+ raise
def start_process(self):
if self._owner_id is not None:
@@ -282,6 +280,20 @@ class SubprocessServer(object):
finally:
self._grpc_channel = None
+ def stop_force(self):
+ try:
+ self._cache.force_remove(tuple(self._cmd), self._port, self._logger)
+ finally:
+ self._owner_id = None
+ if self._grpc_channel:
+ try:
+ self._grpc_channel.close()
+ except: # pylint: disable=bare-except
+ _LOGGER.error(
+ "Could not close the gRPC channel started with cmd %s", self._cmd)
+ finally:
+ self._grpc_channel = None
+
def _really_stop_process(process_and_endpoint):
process, _ = process_and_endpoint # pylint: disable=unpacking-non-sequence
if not process:
diff --git a/sdks/python/apache_beam/utils/subprocess_server_test.py
b/sdks/python/apache_beam/utils/subprocess_server_test.py
index 073b8b3bcbe..a44b89b17e3 100644
--- a/sdks/python/apache_beam/utils/subprocess_server_test.py
+++ b/sdks/python/apache_beam/utils/subprocess_server_test.py
@@ -464,6 +464,93 @@ class CacheTest(unittest.TestCase):
# without raising ValueError.
server.stop_process()
+ def test_force_remove(self):
+ destructor_calls = []
+
+ def custom_destructor(obj):
+ destructor_calls.append(obj)
+
+ cache = subprocess_server._SharedCache(self.with_prefix, custom_destructor)
+
+ owner1 = cache.register()
+ owner2 = cache.register()
+
+ # Get object 'a' under both active owners
+ a = cache.get('a')
+ self.assertEqual(a[0], 'a')
+ self.assertIn(('a', ), cache._cache)
+
+ # force_remove on a non-existent key should be a safe no-op
+ cache.force_remove('non_existent')
+
+ # Call force_remove, which should bypass the owners check and delete it
immediately
+ cache.force_remove('a')
+
+ # The cache entry should be gone
+ self.assertNotIn(('a', ), cache._cache)
+
+ # Destructor must be called on 'a'
+ self.assertEqual(destructor_calls, [a])
+
+ # Retrieving 'a' again under the active owners should construct a new
object
+ new_a = cache.get('a')
+ self.assertNotEqual(new_a, a)
+ self.assertEqual(new_a[0], 'a')
+
+ # Clean up
+ cache.purge(owner1)
+ cache.purge(owner2)
+
+ def test_subprocess_server_start_failed_no_leak(self):
+ destructor_calls = []
+
+ def custom_destructor(obj):
+ destructor_calls.append(obj)
+
+ class DummyProcess:
+ def __init__(self):
+ self.args = ["dummy_cmd"]
+
+ def poll(self):
+ return 1 # Simulate that process exited/failed
+
+ dummy_process = DummyProcess()
+ cache = subprocess_server._SharedCache(
+ lambda *args: (dummy_process, "localhost:12345"), custom_destructor)
+
+ # 1. Register an independent, unrelated owner in the cache first.
+ other_owner = cache.register()
+
+ class CustomServer(subprocess_server.SubprocessServer):
+ _cache = cache
+
+ def __init__(self):
+ super().__init__(lambda channel: None, ["dummy_cmd"], port=12345)
+
+ server = CustomServer()
+ # Fetch the process using other_owner, creating the cache entry and
registering other_owner on it.
+ cache.get(tuple(server._cmd), server._port, server._logger)
+
+ cache_key = (tuple(server._cmd), server._port, server._logger)
+ self.assertIn(cache_key, cache._cache)
+ self.assertEqual(cache._cache[cache_key].owners, {other_owner})
+
+ # 2. Verify starting the server (which registers its own owner and
retrieves from cache) raises RuntimeError
+ with self.assertRaises(RuntimeError):
+ server.start()
+
+ # 3. Verify that the destructor was called on the process, meaning no leak
(even though other_owner was still registered!)
+ self.assertEqual(destructor_calls, [(dummy_process, "localhost:12345")])
+
+ # 4. Verify that the server has cleaned up its owner_id
+ self.assertIsNone(server._owner_id)
+
+ # 5. Verify the cache entry has been removed completely
+ self.assertNotIn(cache_key, cache._cache)
+
+ # Clean up the other owner
+ cache.purge(other_owner)
+
if __name__ == '__main__':
unittest.main()