gemini-code-assist[bot] commented on code in PR #37560:
URL: https://github.com/apache/beam/pull/37560#discussion_r2793035412


##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -205,85 +189,162 @@ def py_value_to_js_dict(py_value):
     return py_value
 
 
-# TODO(yaml) Consider adding optional language version parameter to support
-#  ECMAScript 5 and 6
+class PythonMonkeyDispatcher:
+  def __init__(self):
+    self._req_queue = queue.Queue()
+    self._resp_events = {}
+    self._resp_data = {}
+    self._lock = threading.Lock()
+    self._thread = threading.Thread(target=self._worker, daemon=True)
+    self._started = False
+
+  def start(self):
+    with self._lock:
+      if not self._started:
+        self._thread.start()
+        self._started = True
+        atexit.register(self.stop)
+
+  def stop(self):
+    # This method is called on process exit.
+    # We force an immediate exit to avoid a segmentation fault that often occur
+    # with pythonmonkey during standard Python interpreter finalization.
+    os._exit(0)
+
+  def _worker(self):
+    try:
+      import pythonmonkey as pm
+    except ImportError:
+      pm = None
+
+    self._pm = pm
+    self._cache = {}
+
+    while True:
+      req = self._req_queue.get()
+      if req is None:
+        break
+
+      req_id, type_str, payload = req
+      res = None
+      is_err = False
+      try:
+        if self._pm is None:
+          raise ImportError(
+              "PythonMonkey not installed or failed to import in worker 
thread."
+          )
+
+        if type_str == 'exec':
+          source, row = payload
+          if source not in self._cache:
+            self._cache[source] = self._pm.eval(f"({source})")
+          func = self._cache[source]
+          res = func(row)
+      except Exception as e:
+        res = e
+        is_err = True
+
+      with self._lock:
+        if req_id in self._resp_events:
+          self._resp_data[req_id] = (is_err, res)
+          self._resp_events[req_id].set()
+
+  def eval_and_run(self, source, row):
+    if not self._started:
+      self.start()
+
+    req_id = str(uuid.uuid4())
+    event = threading.Event()
+    with self._lock:
+      self._resp_events[req_id] = event
+
+    self._req_queue.put((req_id, 'exec', (source, row)))
+    event.wait()
+
+    with self._lock:
+      is_err, result = self._resp_data.pop(req_id)
+      del self._resp_events[req_id]
+
+    if is_err:
+      raise result
+    return result
+
+
+_pythonmonkey_dispatcher = PythonMonkeyDispatcher()
+
+
+class JavaScriptCallable:
+  def __init__(self, source, original_fields=None, name=None):
+    self._source = source
+    self._name = name
+
+  def __call__(self, row):
+    # Check for pythonmonkey availability lazily (on first call)
+    if importlib.util.find_spec("pythonmonkey") is None:
+      raise RuntimeError(
+          "PythonMonkey is not installed. Please install 'apache_beam[yaml]' "
+          "to use JavaScript mapping functions.")
+
+    row_as_dict = py_value_to_js_dict(row)
+    try:
+      # If we have a name, it means we evaluated a file and need to call
+      # a specific function.
+      # Dispatcher expects a self-contained source/expression.
+      if self._name:
+        # Wrap: (function() { <source>; return <name>; })()
+        effective_source = (
+            f"(function() {{ {self._source}; return {self._name}; }})()")
+      else:
+        # Expression/Callable case: Wrap in parens to be safe
+        effective_source = f"({self._source})"
+
+      js_result = _pythonmonkey_dispatcher.eval_and_run(
+          effective_source, row_as_dict)
+
+    except Exception as exn:
+      raise RuntimeError(
+          f"Error evaluating javascript expression: {exn}") from exn
+    return dicts_to_rows(_finalize_js_result(js_result))
+
+
+def _finalize_js_result(obj):
+  """Coerce pythonmonkey objects to native Python objects (specifically
+  strings).
+  """
+  if type(obj) is str:
+    return obj
+  if isinstance(obj, str):
+    return str(obj)

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   These two `if` blocks for string handling can be simplified into one. 
`isinstance(obj, str)` covers both `str` and its subclasses, and `str(obj)` is 
a safe conversion for both.
   
   ```suggestion
     if isinstance(obj, str):
       return str(obj)
   ```



##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -205,85 +189,162 @@ def py_value_to_js_dict(py_value):
     return py_value
 
 
-# TODO(yaml) Consider adding optional language version parameter to support
-#  ECMAScript 5 and 6
+class PythonMonkeyDispatcher:
+  def __init__(self):
+    self._req_queue = queue.Queue()
+    self._resp_events = {}
+    self._resp_data = {}
+    self._lock = threading.Lock()
+    self._thread = threading.Thread(target=self._worker, daemon=True)
+    self._started = False
+
+  def start(self):
+    with self._lock:
+      if not self._started:
+        self._thread.start()
+        self._started = True
+        atexit.register(self.stop)
+
+  def stop(self):
+    # This method is called on process exit.
+    # We force an immediate exit to avoid a segmentation fault that often occur
+    # with pythonmonkey during standard Python interpreter finalization.
+    os._exit(0)

Review Comment:
   ![critical](https://www.gstatic.com/codereviewagent/critical.svg)
   
   The use of `os._exit(0)` is extremely dangerous as it terminates the process 
immediately, bypassing standard cleanup procedures like `finally` blocks or 
other `atexit` handlers. This can lead to severe issues such as data loss from 
unflushed file buffers, corrupted state, and orphaned resources.
   
   While the comment mentions this is to avoid a segmentation fault from 
`pythonmonkey`, this approach should be a last resort. Have you considered 
exploring other ways to gracefully shut down the `pythonmonkey` engine or the 
worker thread? The `pythonmonkey` library is actively maintained, so it might 
be worth checking their documentation for a de-initialization function or 
filing an issue with them if one doesn't exist.
   
   If `os._exit(0)` is truly unavoidable, the comment should be expanded to 
more strongly warn about the potential for data loss and other side effects.



##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -205,85 +189,162 @@ def py_value_to_js_dict(py_value):
     return py_value
 
 
-# TODO(yaml) Consider adding optional language version parameter to support
-#  ECMAScript 5 and 6
+class PythonMonkeyDispatcher:
+  def __init__(self):
+    self._req_queue = queue.Queue()
+    self._resp_events = {}
+    self._resp_data = {}
+    self._lock = threading.Lock()
+    self._thread = threading.Thread(target=self._worker, daemon=True)
+    self._started = False
+
+  def start(self):
+    with self._lock:
+      if not self._started:
+        self._thread.start()
+        self._started = True
+        atexit.register(self.stop)
+
+  def stop(self):
+    # This method is called on process exit.
+    # We force an immediate exit to avoid a segmentation fault that often occur
+    # with pythonmonkey during standard Python interpreter finalization.
+    os._exit(0)
+
+  def _worker(self):
+    try:
+      import pythonmonkey as pm
+    except ImportError:
+      pm = None
+
+    self._pm = pm
+    self._cache = {}
+
+    while True:
+      req = self._req_queue.get()
+      if req is None:
+        break
+
+      req_id, type_str, payload = req
+      res = None
+      is_err = False
+      try:
+        if self._pm is None:
+          raise ImportError(
+              "PythonMonkey not installed or failed to import in worker 
thread."
+          )
+
+        if type_str == 'exec':
+          source, row = payload
+          if source not in self._cache:
+            self._cache[source] = self._pm.eval(f"({source})")
+          func = self._cache[source]
+          res = func(row)
+      except Exception as e:
+        res = e
+        is_err = True
+
+      with self._lock:
+        if req_id in self._resp_events:
+          self._resp_data[req_id] = (is_err, res)
+          self._resp_events[req_id].set()
+
+  def eval_and_run(self, source, row):
+    if not self._started:
+      self.start()
+
+    req_id = str(uuid.uuid4())
+    event = threading.Event()
+    with self._lock:
+      self._resp_events[req_id] = event
+
+    self._req_queue.put((req_id, 'exec', (source, row)))
+    event.wait()
+
+    with self._lock:
+      is_err, result = self._resp_data.pop(req_id)
+      del self._resp_events[req_id]
+
+    if is_err:
+      raise result
+    return result
+
+
+_pythonmonkey_dispatcher = PythonMonkeyDispatcher()
+
+
+class JavaScriptCallable:
+  def __init__(self, source, original_fields=None, name=None):

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The `original_fields` parameter is unused in this `__init__` method and can 
be removed to clean up the code.
   
   ```suggestion
     def __init__(self, source, name=None):
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to