This is an automated email from the ASF dual-hosted git repository. damccorm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new f4d03d49713 Minimize scope of expensive lock (#30679) f4d03d49713 is described below commit f4d03d49713cf89260c141ee35b4dadb31ad4193 Author: Danny McCormick <dannymccorm...@google.com> AuthorDate: Wed Mar 20 16:42:38 2024 -0400 Minimize scope of expensive lock (#30679) * Minimize scope of expensive lock * Build list in lock * Update sdks/python/apache_beam/runners/worker/sdk_worker.py Co-authored-by: tvalentyn <tvalen...@users.noreply.github.com> * Move comment * Simplify * fmt --------- Co-authored-by: tvalentyn <tvalen...@users.noreply.github.com> --- sdks/python/apache_beam/runners/worker/sdk_worker.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py index 04b58a6f198..65059ab054f 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py @@ -607,12 +607,18 @@ class BundleProcessorCache(object): # type: () -> None def shutdown_inactive_bundle_processors(): # type: () -> None + inactive_descriptor_ids = [] + inactive_time = time.time( + ) - DEFAULT_BUNDLE_PROCESSOR_CACHE_SHUTDOWN_THRESHOLD_S with self._lock: for descriptor_id, last_access_time in self.last_access_times.items(): - if (time.time() - last_access_time > - DEFAULT_BUNDLE_PROCESSOR_CACHE_SHUTDOWN_THRESHOLD_S): - BundleProcessorCache._shutdown_cached_bundle_processors( - self.cached_bundle_processors[descriptor_id]) + if (inactive_time > last_access_time): + inactive_descriptor_ids.append(descriptor_id) + + # Shutdown can be expensive, keep out of lock + for descriptor_id in inactive_descriptor_ids: + BundleProcessorCache._shutdown_cached_bundle_processors( + self.cached_bundle_processors[descriptor_id]) self.periodic_shutdown = PeriodicThread( DEFAULT_BUNDLE_PROCESSOR_CACHE_SHUTDOWN_THRESHOLD_S,