This is an automated email from the ASF dual-hosted git repository.

shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 361c2c473a8 Reduce python expansion service startup time (#38611)
361c2c473a8 is described below

commit 361c2c473a8eb6ebddfc3211f78553b521fdfa54
Author: Shunping Huang <[email protected]>
AuthorDate: Sun May 24 08:12:52 2026 -0400

    Reduce python expansion service startup time (#38611)
    
    * Revert "Fix test hang in subprocess expansion service on port bind 
failure (#38572)"
    
    This reverts commit 930b94cceb69e33bd9a9a2f1287ebe5c75533536.
    
    * Ensure immediate cleanup of subprocess server on start failure
    
    When a SubprocessServer fails to start (e.g., due to a process exit or
    startup error), the server process could leak if standard purging
    is blocked by other active owners sharing the cached subprocess.
    
    To fix this:
    - Implement `_SharedCache.force_remove()` to immediately remove a key
      from the cache and run its destructor regardless of active owners.
    - Add `SubprocessServer.stop_force()` which calls `force_remove()` to
      completely terminate the server's process.
    - Call `stop_force()` in the `except` block of `SubprocessServer.start()`
    
    * Support modern manylinux tags based on pip version in Stager
    
    This ensures we can download pre-built wheels for environment staging
    rather than relying on tarball building, which is sometimes slow.
    
    * Formatting
    
    * Trigger more python tests.
    
    * Typo
---
 .../beam_PostCommit_Python_Versions.json           |   2 +-
 .../runners/portability/expansion_service_main.py  |  14 +--
 .../apache_beam/runners/portability/stager.py      |   8 +-
 sdks/python/apache_beam/utils/subprocess_server.py | 106 ++++++++++++---------
 .../apache_beam/utils/subprocess_server_test.py    |  87 +++++++++++++++++
 5 files changed, 155 insertions(+), 62 deletions(-)

diff --git a/.github/trigger_files/beam_PostCommit_Python_Versions.json 
b/.github/trigger_files/beam_PostCommit_Python_Versions.json
index 541dc4ea8e8..8ed972c9f57 100644
--- a/.github/trigger_files/beam_PostCommit_Python_Versions.json
+++ b/.github/trigger_files/beam_PostCommit_Python_Versions.json
@@ -1,4 +1,4 @@
 {
   "comment": "Modify this file in a trivial way to cause this test suite to 
run",
-  "revision": 2
+  "revision": 3
 }
diff --git 
a/sdks/python/apache_beam/runners/portability/expansion_service_main.py 
b/sdks/python/apache_beam/runners/portability/expansion_service_main.py
index f2d03e0e898..269d02b3efb 100644
--- a/sdks/python/apache_beam/runners/portability/expansion_service_main.py
+++ b/sdks/python/apache_beam/runners/portability/expansion_service_main.py
@@ -55,9 +55,7 @@ def main(argv):
   with 
fully_qualified_named_transform.FullyQualifiedNamedTransform.with_filter(
       known_args.fully_qualified_name_glob):
 
-    # Bind to localhost instead of 0.0.0.0 to ensure compatibility with 
loopback
-    # connections on dual-stack (IPv4/IPv6) systems.
-    address = 'localhost:{}'.format(known_args.port)
+    address = '0.0.0.0:{}'.format(known_args.port)
     server = grpc.server(thread_pool_executor.shared_unbounded_instance())
     if known_args.serve_loopback_worker:
       beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server(
@@ -73,15 +71,9 @@ def main(argv):
         artifact_service.ArtifactRetrievalService(
             artifact_service.BeamFilesystemHandler(None).file_reader),
         server)
-    # Ensure gRPC server successfully binds. If this fails (e.g., due to port 
collision),
-    # add_insecure_port returns 0. We raise an error to crash the subprocess 
immediately,
-    # allowing the parent process to detect it and fail fast rather than 
hanging.
-    bound_port = server.add_insecure_port(address)
-    if not bound_port:
-      raise RuntimeError(
-          "Failed to bind expansion service to {}".format(address))
+    server.add_insecure_port(address)
     server.start()
-    _LOGGER.info('Listening for expansion requests at %d', bound_port)
+    _LOGGER.info('Listening for expansion requests at %d', known_args.port)
 
     def cleanup(unused_signum, unused_frame):
       _LOGGER.info('Shutting down expansion service.')
diff --git a/sdks/python/apache_beam/runners/portability/stager.py 
b/sdks/python/apache_beam/runners/portability/stager.py
index e862fde4efe..136c320da00 100644
--- a/sdks/python/apache_beam/runners/portability/stager.py
+++ b/sdks/python/apache_beam/runners/portability/stager.py
@@ -732,9 +732,11 @@ class Stager(object):
     # addressed, download wheel based on glibc version in Beam's Python
     # Base image
     pip_version = distribution('pip').version
-    if version.parse(pip_version) >= version.parse('19.3'):
-      # pip can only recognize manylinux2014_x86_64 wheels
-      # from version 19.3.
+    # See more information about manylinux at
+    # https://github.com/pypa/manylinux
+    if version.parse(pip_version) >= version.parse('20.3'):
+      return 'manylinux_2_28_x86_64'
+    elif version.parse(pip_version) >= version.parse('19.3'):
       return 'manylinux2014_x86_64'
     else:
       return 'manylinux2010_x86_64'
diff --git a/sdks/python/apache_beam/utils/subprocess_server.py 
b/sdks/python/apache_beam/utils/subprocess_server.py
index b22e6badb5e..0b09b364362 100644
--- a/sdks/python/apache_beam/utils/subprocess_server.py
+++ b/sdks/python/apache_beam/utils/subprocess_server.py
@@ -118,6 +118,12 @@ class _SharedCache:
         self._cache[key].owners.add(owner)
       return self._cache[key].obj
 
+  def force_remove(self, *key):
+    with self._lock:
+      entry = self._cache.pop(key, None)
+    if entry is not None:
+      self._destructor(entry.obj)
+
 
 class JavaHelper:
   @classmethod
@@ -186,53 +192,45 @@ class SubprocessServer(object):
     self.stop()
 
   def start(self):
-    max_attempts = 3
-    for attempt in range(max_attempts):
-      try:
-        process, endpoint = self.start_process()
-        wait_secs = .1
-        channel_options = [
-            ("grpc.max_receive_message_length", -1),
-            ("grpc.max_send_message_length", -1),
-            # Default: 20000ms (20s), increased to 10 minutes for stability
-            ("grpc.keepalive_timeout_ms", 600_000),
-            # Default: 2, set to 0 to allow unlimited pings without data
-            ("grpc.http2.max_pings_without_data", 0),
-            # Default: False, set to True to allow keepalive pings when no 
calls
-            ("grpc.keepalive_permit_without_calls", True),
-            # Default: 2, set to 0 to allow unlimited ping strikes
-            ("grpc.http2.max_ping_strikes", 0),
-            # Default: 0 (disabled), enable socket reuse for better handling
-            ("grpc.so_reuseport", 1),
-        ]
-        self._grpc_channel = grpc.insecure_channel(
-            endpoint, options=channel_options)
-        channel_ready = grpc.channel_ready_future(self._grpc_channel)
-        while True:
-          if process is not None and process.poll() is not None:
-            _LOGGER.error("Started job service with %s", process.args)
-            raise RuntimeError(
-                'Service failed to start up with error %s' % process.poll())
-          try:
-            channel_ready.result(timeout=wait_secs)
-            break
-          except (grpc.FutureTimeoutError, grpc.RpcError):
-            wait_secs *= 1.2
-            logging.log(
-                logging.WARNING if wait_secs > 1 else logging.DEBUG,
-                'Waiting for grpc channel to be ready at %s.',
-                endpoint)
-        return self._stub_class(self._grpc_channel)
-      except Exception as e:
-        _LOGGER.warning(
-            "Error bringing up service on attempt %d: %s",
-            attempt + 1,
-            e,
-            exc_info=True)
-        self.stop()
-        if attempt == max_attempts - 1:
-          raise
-        time.sleep(1)
+    try:
+      process, endpoint = self.start_process()
+      wait_secs = .1
+      channel_options = [
+          ("grpc.max_receive_message_length", -1),
+          ("grpc.max_send_message_length", -1),
+          # Default: 20000ms (20s), increased to 10 minutes for stability
+          ("grpc.keepalive_timeout_ms", 600_000),
+          # Default: 2, set to 0 to allow unlimited pings without data
+          ("grpc.http2.max_pings_without_data", 0),
+          # Default: False, set to True to allow keepalive pings when no calls
+          ("grpc.keepalive_permit_without_calls", True),
+          # Default: 2, set to 0 to allow unlimited ping strikes
+          ("grpc.http2.max_ping_strikes", 0),
+          # Default: 0 (disabled), enable socket reuse for better handling
+          ("grpc.so_reuseport", 1),
+      ]
+      self._grpc_channel = grpc.insecure_channel(
+          endpoint, options=channel_options)
+      channel_ready = grpc.channel_ready_future(self._grpc_channel)
+      while True:
+        if process is not None and process.poll() is not None:
+          _LOGGER.error("Started job service with %s", process.args)
+          raise RuntimeError(
+              'Service failed to start up with error %s' % process.poll())
+        try:
+          channel_ready.result(timeout=wait_secs)
+          break
+        except (grpc.FutureTimeoutError, grpc.RpcError):
+          wait_secs *= 1.2
+          logging.log(
+              logging.WARNING if wait_secs > 1 else logging.DEBUG,
+              'Waiting for grpc channel to be ready at %s.',
+              endpoint)
+      return self._stub_class(self._grpc_channel)
+    except:  # pylint: disable=bare-except
+      _LOGGER.exception("Error bringing up service")
+      self.stop_force()
+      raise
 
   def start_process(self):
     if self._owner_id is not None:
@@ -282,6 +280,20 @@ class SubprocessServer(object):
       finally:
         self._grpc_channel = None
 
+  def stop_force(self):
+    try:
+      self._cache.force_remove(tuple(self._cmd), self._port, self._logger)
+    finally:
+      self._owner_id = None
+    if self._grpc_channel:
+      try:
+        self._grpc_channel.close()
+      except:  # pylint: disable=bare-except
+        _LOGGER.error(
+            "Could not close the gRPC channel started with cmd %s", self._cmd)
+      finally:
+        self._grpc_channel = None
+
   def _really_stop_process(process_and_endpoint):
     process, _ = process_and_endpoint  # pylint: disable=unpacking-non-sequence
     if not process:
diff --git a/sdks/python/apache_beam/utils/subprocess_server_test.py 
b/sdks/python/apache_beam/utils/subprocess_server_test.py
index 073b8b3bcbe..a44b89b17e3 100644
--- a/sdks/python/apache_beam/utils/subprocess_server_test.py
+++ b/sdks/python/apache_beam/utils/subprocess_server_test.py
@@ -464,6 +464,93 @@ class CacheTest(unittest.TestCase):
     # without raising ValueError.
     server.stop_process()
 
+  def test_force_remove(self):
+    destructor_calls = []
+
+    def custom_destructor(obj):
+      destructor_calls.append(obj)
+
+    cache = subprocess_server._SharedCache(self.with_prefix, custom_destructor)
+
+    owner1 = cache.register()
+    owner2 = cache.register()
+
+    # Get object 'a' under both active owners
+    a = cache.get('a')
+    self.assertEqual(a[0], 'a')
+    self.assertIn(('a', ), cache._cache)
+
+    # force_remove on a non-existent key should be a safe no-op
+    cache.force_remove('non_existent')
+
+    # Call force_remove, which should bypass the owners check and delete it 
immediately
+    cache.force_remove('a')
+
+    # The cache entry should be gone
+    self.assertNotIn(('a', ), cache._cache)
+
+    # Destructor must be called on 'a'
+    self.assertEqual(destructor_calls, [a])
+
+    # Retrieving 'a' again under the active owners should construct a new 
object
+    new_a = cache.get('a')
+    self.assertNotEqual(new_a, a)
+    self.assertEqual(new_a[0], 'a')
+
+    # Clean up
+    cache.purge(owner1)
+    cache.purge(owner2)
+
+  def test_subprocess_server_start_failed_no_leak(self):
+    destructor_calls = []
+
+    def custom_destructor(obj):
+      destructor_calls.append(obj)
+
+    class DummyProcess:
+      def __init__(self):
+        self.args = ["dummy_cmd"]
+
+      def poll(self):
+        return 1  # Simulate that process exited/failed
+
+    dummy_process = DummyProcess()
+    cache = subprocess_server._SharedCache(
+        lambda *args: (dummy_process, "localhost:12345"), custom_destructor)
+
+    # 1. Register an independent, unrelated owner in the cache first.
+    other_owner = cache.register()
+
+    class CustomServer(subprocess_server.SubprocessServer):
+      _cache = cache
+
+      def __init__(self):
+        super().__init__(lambda channel: None, ["dummy_cmd"], port=12345)
+
+    server = CustomServer()
+    # Fetch the process using other_owner, creating the cache entry and 
registering other_owner on it.
+    cache.get(tuple(server._cmd), server._port, server._logger)
+
+    cache_key = (tuple(server._cmd), server._port, server._logger)
+    self.assertIn(cache_key, cache._cache)
+    self.assertEqual(cache._cache[cache_key].owners, {other_owner})
+
+    # 2. Verify starting the server (which registers its own owner and 
retrieves from cache) raises RuntimeError
+    with self.assertRaises(RuntimeError):
+      server.start()
+
+    # 3. Verify that the destructor was called on the process, meaning no leak 
(even though other_owner was still registered!)
+    self.assertEqual(destructor_calls, [(dummy_process, "localhost:12345")])
+
+    # 4. Verify that the server has cleaned up its owner_id
+    self.assertIsNone(server._owner_id)
+
+    # 5. Verify the cache entry has been removed completely
+    self.assertNotIn(cache_key, cache._cache)
+
+    # Clean up the other owner
+    cache.purge(other_owner)
+
 
 if __name__ == '__main__':
   unittest.main()

Reply via email to