AMOOOMA commented on code in PR #37112:
URL: https://github.com/apache/beam/pull/37112#discussion_r2688510394
##########
sdks/python/apache_beam/utils/multi_process_shared.py:
##########
@@ -301,6 +419,10 @@ def acquire(self):
# Caveat: They must always agree, as they will be ignored if the object
# is already constructed.
singleton = self._get_manager().acquire_singleton(self._tag)
+ # Trigger a sweep of zombie processes.
+ # calling active_children() has the side-effect of joining any finished
+ # processes, effectively reaping zombies from previous unsafe_hard_deletes.
+ if self._spawn_process: multiprocessing.active_children()
Review Comment:
Updated.
##########
sdks/python/apache_beam/utils/multi_process_shared.py:
##########
@@ -200,9 +226,99 @@ def __call__(self, *args, **kwargs):
def __getattr__(self, name):
return getattr(self._proxyObject, name)
+ def __setstate__(self, state):
+ self.__dict__.update(state)
+
+ def __getstate__(self):
+ return self.__dict__
+
def get_auto_proxy_object(self):
return self._proxyObject
+ def unsafe_hard_delete(self):
+ try:
+ self._proxyObject.unsafe_hard_delete()
+ except (EOFError, ConnectionResetError, BrokenPipeError):
+ pass
+ except Exception as e:
+ logging.warning(
+ "Exception %s when trying to hard delete shared object proxy", e)
+
+
+def _run_server_process(address_file, tag, constructor, authkey):
+ """
+ Runs in a separate process.
+ Includes a 'Suicide Pact' monitor: If parent dies, I die.
+ """
+ parent_pid = os.getppid()
+
+ def cleanup_files():
+ logging.info("Server process exiting. Deleting files for %s", tag)
+ try:
+ if os.path.exists(address_file):
+ os.remove(address_file)
+ if os.path.exists(address_file + ".error"):
+ os.remove(address_file + ".error")
+ except Exception:
+ pass
+
+ def handle_unsafe_hard_delete():
+ cleanup_files()
+ os._exit(0)
+
+ def _monitor_parent():
+ """Checks if parent is alive every second."""
+ while True:
+ try:
+ os.kill(parent_pid, 0)
+ except OSError:
+ logging.warning(
+ "Process %s detected Parent %s died. Self-destructing.",
+ os.getpid(),
+ parent_pid)
+ cleanup_files()
+ os._exit(0)
+ time.sleep(0.5)
+
+ atexit.register(cleanup_files)
+
+ try:
+ t = threading.Thread(target=_monitor_parent, daemon=True)
+ t.start()
Review Comment:
Updated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]