gemini-code-assist[bot] commented on code in PR #38620:
URL: https://github.com/apache/beam/pull/38620#discussion_r3304973506


##########
sdks/python/apache_beam/utils/subprocess_server.py:
##########
@@ -108,14 +108,22 @@ def purge(self, owner):
     for value in to_delete:
       self._destructor(value)
 
-  def get(self, *key):
+  def get(self, *key, owner=None):
     if not self._live_owners:
       raise RuntimeError("At least one owner must be registered.")
     with self._lock:
       if key not in self._cache:
         self._cache[key] = _SharedCacheEntry(self._constructor(*key), set())
-      for owner in self._live_owners:
+      if owner is not None:
+        if owner not in self._live_owners:
+          raise RuntimeError("The requesting owner must be registered.")
         self._cache[key].owners.add(owner)
+        for live_owner, is_context in self._live_owners.items():
+          if is_context:
+            self._cache[key].owners.add(live_owner)
+      else:
+        for live_owner in self._live_owners:
+          self._cache[key].owners.add(live_owner)
       return self._cache[key].obj

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   ### Potential Resource Leaks and Race Conditions in `get()`
   
   There are two critical issues in the current implementation of `get()` that 
can lead to silent resource leaks (orphaned subprocesses):
   
   1. **Race Condition on `self._live_owners` Check**:
      Checking `self._live_owners` outside of `self._lock` (lines 112-113) is 
not thread-safe. If another thread purges the last owner after this check but 
before the lock is acquired, `get()` will proceed to create a cache entry with 
an empty `owners` set. Since the entry has no owners, it will never be cleaned 
up by subsequent `purge()` calls, leaking the subprocess.
   
   2. **Resource Leak on Invalid Owner**:
      If an invalid/unregistered `owner` is passed, the cache entry is created 
and added to `self._cache` *before* the owner validation check is performed 
(lines 118-119). When the `RuntimeError` is raised, the newly created cache 
entry is left in `self._cache` with an empty `owners` set, permanently leaking 
the subprocess.
   
   ### Solution:
   - Move the `self._live_owners` check inside the lock.
   - Validate the `owner` registration *before* creating the cache entry.
   
   ```suggestion
     def get(self, *key, owner=None):
       with self._lock:
         if not self._live_owners:
           raise RuntimeError("At least one owner must be registered.")
         if owner is not None and owner not in self._live_owners:
           raise RuntimeError("The requesting owner must be registered.")
   
         if key not in self._cache:
           self._cache[key] = _SharedCacheEntry(self._constructor(*key), set())
   
         if owner is not None:
           self._cache[key].owners.add(owner)
           for live_owner, is_context in self._live_owners.items():
             if is_context:
               self._cache[key].owners.add(live_owner)
         else:
           for live_owner in self._live_owners:
             self._cache[key].owners.add(live_owner)
         return self._cache[key].obj
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to