AMOOOMA commented on code in PR #37112:
URL: https://github.com/apache/beam/pull/37112#discussion_r2747666234


##########
sdks/python/apache_beam/utils/multi_process_shared.py:
##########
@@ -203,6 +228,89 @@ def __getattr__(self, name):
   def get_auto_proxy_object(self):
     return self._proxyObject
 
+  def unsafe_hard_delete(self):
+    self._proxyObject.unsafe_hard_delete()
+
+
+def _run_server_process(address_file, tag, constructor, authkey):
+  """
+    Runs in a separate process.
+    Includes a 'Suicide Pact' monitor: If parent dies, I die.
+    """
+  parent_pid = os.getppid()
+
+  def cleanup_files():
+    logging.info("Server process exiting. Deleting files for %s", tag)
+    try:
+      if os.path.exists(address_file):
+        os.remove(address_file)
+      if os.path.exists(address_file + ".error"):
+        os.remove(address_file + ".error")
+    except Exception as e:
+      logging.warning('Failed to cleanup files for tag %s: %s', tag, e)
+
+  def handle_unsafe_hard_delete():
+    cleanup_files()
+    os._exit(0)
+
+  def _monitor_parent():
+    """Checks if parent is alive every second."""
+    while True:
+      try:
+        # Sends a check to see if parent_pid is still alive,
+        # this call will fail with OSError if the parent has died
+        # and no-op if alive.
+        os.kill(parent_pid, 0)
+      except OSError:
+        logging.warning(
+            "Process %s detected Parent %s died. Self-destructing.",
+            os.getpid(),
+            parent_pid)
+        cleanup_files()
+        os._exit(0)
+      time.sleep(0.5)
+
+  atexit.register(cleanup_files)
+
+  try:
+    t = threading.Thread(target=_monitor_parent, daemon=True)
+
+    logging.getLogger().setLevel(logging.INFO)
+    multiprocessing.current_process().authkey = authkey
+
+    serving_manager = _SingletonRegistrar(
+        address=('localhost', 0), authkey=authkey)
+    _process_level_singleton_manager.set_hard_delete_callback(
+        handle_unsafe_hard_delete)
+    _process_level_singleton_manager.register_singleton(
+        constructor,
+        tag,
+        initialize_eagerly=True,
+        hard_delete_callback=handle_unsafe_hard_delete)
+    # Start monitoring parent after initialisation is done to avoid
+    # potential race conditions.
+    t.start()
+
+    server = serving_manager.get_server()
+    logging.info(
+        'Process %s: Proxy serving %s at %s', os.getpid(), tag, server.address)
+
+    with open(address_file + '.tmp', 'w') as fout:
+      fout.write('%s:%d' % server.address)
+    os.rename(address_file + '.tmp', address_file)
+
+    server.serve_forever()
+
+  except Exception:
+    tb = traceback.format_exc()
+    try:
+      with open(address_file + ".error.tmp", 'w') as fout:
+        fout.write(tb)
+      os.rename(address_file + ".error.tmp", address_file + ".error")
+    except Exception:
+      print(f"CRITICAL ERROR IN SHARED SERVER:\n{tb}", file=sys.stderr)

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to