damccorm commented on code in PR #37112:
URL: https://github.com/apache/beam/pull/37112#discussion_r2687533731


##########
sdks/python/apache_beam/utils/multi_process_shared.py:
##########
@@ -79,6 +83,10 @@ def singletonProxy_release(self):
     assert self._SingletonProxy_valid
     self._SingletonProxy_valid = False
 
+  def unsafe_hard_delete(self):

Review Comment:
   Could you help me understand why we need the `unsafe_hard_delete` changes? 
Its not really clear to me what behavior this enables which we can't already do



##########
sdks/python/apache_beam/utils/multi_process_shared.py:
##########
@@ -200,9 +226,99 @@ def __call__(self, *args, **kwargs):
   def __getattr__(self, name):
     return getattr(self._proxyObject, name)
 
+  def __setstate__(self, state):
+    self.__dict__.update(state)
+
+  def __getstate__(self):
+    return self.__dict__

Review Comment:
   I assume this is so that this is pickleable, but is it valid? Normally I'd 
expect this to not be pickleable since the proxy objects aren't necessarily 
valid in another context



##########
sdks/python/apache_beam/utils/multi_process_shared.py:
##########
@@ -200,9 +226,99 @@ def __call__(self, *args, **kwargs):
   def __getattr__(self, name):
     return getattr(self._proxyObject, name)
 
+  def __setstate__(self, state):
+    self.__dict__.update(state)
+
+  def __getstate__(self):
+    return self.__dict__
+
   def get_auto_proxy_object(self):
     return self._proxyObject
 
+  def unsafe_hard_delete(self):
+    try:
+      self._proxyObject.unsafe_hard_delete()
+    except (EOFError, ConnectionResetError, BrokenPipeError):
+      pass
+    except Exception as e:
+      logging.warning(
+          "Exception %s when trying to hard delete shared object proxy", e)
+
+
+def _run_server_process(address_file, tag, constructor, authkey):
+  """
+    Runs in a separate process.
+    Includes a 'Suicide Pact' monitor: If parent dies, I die.
+    """
+  parent_pid = os.getppid()
+
+  def cleanup_files():
+    logging.info("Server process exiting. Deleting files for %s", tag)
+    try:
+      if os.path.exists(address_file):
+        os.remove(address_file)
+      if os.path.exists(address_file + ".error"):
+        os.remove(address_file + ".error")
+    except Exception:
+      pass
+
+  def handle_unsafe_hard_delete():
+    cleanup_files()
+    os._exit(0)
+
+  def _monitor_parent():
+    """Checks if parent is alive every second."""
+    while True:
+      try:
+        os.kill(parent_pid, 0)
+      except OSError:
+        logging.warning(
+            "Process %s detected Parent %s died. Self-destructing.",
+            os.getpid(),
+            parent_pid)
+        cleanup_files()
+        os._exit(0)
+      time.sleep(0.5)
+
+  atexit.register(cleanup_files)
+
+  try:
+    t = threading.Thread(target=_monitor_parent, daemon=True)
+    t.start()

Review Comment:
   Might be better to start this after we've initialized our MPS object to 
avoid racy unsafe hard deletes



##########
sdks/python/apache_beam/utils/multi_process_shared.py:
##########
@@ -315,25 +437,102 @@ def unsafe_hard_delete(self):
       to this object exist, or (b) you are ok with all existing references to
       this object throwing strange errors when derefrenced.
     """
-    self._get_manager().unsafe_hard_delete_singleton(self._tag)
+    try:
+      self._get_manager().unsafe_hard_delete_singleton(self._tag)
+    except (EOFError, ConnectionResetError, BrokenPipeError):

Review Comment:
   I'd typically expect the caller to catch/handle this. As it is, there is no 
indication passed back that this call failed.



##########
sdks/python/apache_beam/utils/multi_process_shared.py:
##########
@@ -301,6 +419,10 @@ def acquire(self):
     # Caveat: They must always agree, as they will be ignored if the object
     # is already constructed.
     singleton = self._get_manager().acquire_singleton(self._tag)
+    # Trigger a sweep of zombie processes.
+    # calling active_children() has the side-effect of joining any finished
+    # processes, effectively reaping zombies from previous unsafe_hard_deletes.
+    if self._spawn_process: multiprocessing.active_children()

Review Comment:
   ```suggestion
       if self._spawn_process:
         multiprocessing.active_children()
   ```
   
   style nit to be consistent with the rest of the repo.



##########
sdks/python/apache_beam/utils/multi_process_shared.py:
##########
@@ -200,9 +226,99 @@ def __call__(self, *args, **kwargs):
   def __getattr__(self, name):
     return getattr(self._proxyObject, name)
 
+  def __setstate__(self, state):
+    self.__dict__.update(state)
+
+  def __getstate__(self):
+    return self.__dict__
+
   def get_auto_proxy_object(self):
     return self._proxyObject
 
+  def unsafe_hard_delete(self):
+    try:
+      self._proxyObject.unsafe_hard_delete()
+    except (EOFError, ConnectionResetError, BrokenPipeError):
+      pass
+    except Exception as e:
+      logging.warning(
+          "Exception %s when trying to hard delete shared object proxy", e)
+
+
+def _run_server_process(address_file, tag, constructor, authkey):
+  """
+    Runs in a separate process.
+    Includes a 'Suicide Pact' monitor: If parent dies, I die.
+    """
+  parent_pid = os.getppid()
+
+  def cleanup_files():
+    logging.info("Server process exiting. Deleting files for %s", tag)
+    try:
+      if os.path.exists(address_file):
+        os.remove(address_file)
+      if os.path.exists(address_file + ".error"):
+        os.remove(address_file + ".error")
+    except Exception:
+      pass
+
+  def handle_unsafe_hard_delete():
+    cleanup_files()
+    os._exit(0)
+
+  def _monitor_parent():
+    """Checks if parent is alive every second."""
+    while True:
+      try:
+        os.kill(parent_pid, 0)

Review Comment:
   Why are we sending a kill signal to the parent process? Isn't this the 
opposite of what we want?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to