This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f4d03d49713 Minimize scope of expensive lock (#30679)
f4d03d49713 is described below

commit f4d03d49713cf89260c141ee35b4dadb31ad4193
Author: Danny McCormick <dannymccorm...@google.com>
AuthorDate: Wed Mar 20 16:42:38 2024 -0400

    Minimize scope of expensive lock (#30679)
    
    * Minimize scope of expensive lock
    
    * Build list in lock
    
    * Update sdks/python/apache_beam/runners/worker/sdk_worker.py
    
    Co-authored-by: tvalentyn <tvalen...@users.noreply.github.com>
    
    * Move comment
    
    * Simplify
    
    * fmt
    
    ---------
    
    Co-authored-by: tvalentyn <tvalen...@users.noreply.github.com>
---
 sdks/python/apache_beam/runners/worker/sdk_worker.py | 14 ++++++++++----
 1 file changed, 10 insertions(+), 4 deletions(-)

diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py 
b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index 04b58a6f198..65059ab054f 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -607,12 +607,18 @@ class BundleProcessorCache(object):
     # type: () -> None
     def shutdown_inactive_bundle_processors():
       # type: () -> None
+      inactive_descriptor_ids = []
+      inactive_time = time.time(
+      ) - DEFAULT_BUNDLE_PROCESSOR_CACHE_SHUTDOWN_THRESHOLD_S
       with self._lock:
         for descriptor_id, last_access_time in self.last_access_times.items():
-          if (time.time() - last_access_time >
-              DEFAULT_BUNDLE_PROCESSOR_CACHE_SHUTDOWN_THRESHOLD_S):
-            BundleProcessorCache._shutdown_cached_bundle_processors(
-                self.cached_bundle_processors[descriptor_id])
+          if (inactive_time > last_access_time):
+            inactive_descriptor_ids.append(descriptor_id)
+
+      # Shutdown can be expensive, keep out of lock
+      for descriptor_id in inactive_descriptor_ids:
+        BundleProcessorCache._shutdown_cached_bundle_processors(
+            self.cached_bundle_processors[descriptor_id])
 
     self.periodic_shutdown = PeriodicThread(
         DEFAULT_BUNDLE_PROCESSOR_CACHE_SHUTDOWN_THRESHOLD_S,

Reply via email to