This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch release-2.72
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.72 by this push:
     new 48ed9ad007b cherry-pick - revert js2py to pythonmonkey into 2.72 
release (#37693)
48ed9ad007b is described below

commit 48ed9ad007b89627d73aa7b2817da47b579d28c8
Author: Derrick Williams <[email protected]>
AuthorDate: Tue Feb 24 10:00:31 2026 -0500

    cherry-pick - revert js2py to pythonmonkey into 2.72 release (#37693)
    
    * Revert "[yaml] : switch js2py to pythonmonkey (#37560)"
    
    This reverts commit 5d6cb04ea05b5e981ac7fbf0122135870d7b2a4c.
    
    * remove change info on js2py
---
 CHANGES.md                                    |   2 +-
 sdks/python/apache_beam/yaml/standard_io.yaml |   1 -
 sdks/python/apache_beam/yaml/yaml_mapping.py  | 249 +++++++++-----------------
 sdks/python/apache_beam/yaml/yaml_udf_test.py |  21 +--
 sdks/python/setup.py                          |   5 +-
 5 files changed, 94 insertions(+), 184 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 15b255d8d87..131e0fe4df4 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -91,7 +91,7 @@
 
 ## Security Fixes
 
-* Fixed [CVE-2024-28397](https://www.cve.org/CVERecord?id=CVE-2024-28397) by 
switching from js2py to pythonmonkey (Yaml) 
([#37560](https://github.com/apache/beam/issues/37560)).
+* Fixed [CVE-YYYY-NNNN](https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN) 
(Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).
 
 ## Known Issues
 
diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml 
b/sdks/python/apache_beam/yaml/standard_io.yaml
index d510d73562c..e62b3a562c3 100644
--- a/sdks/python/apache_beam/yaml/standard_io.yaml
+++ b/sdks/python/apache_beam/yaml/standard_io.yaml
@@ -145,7 +145,6 @@
         path: 'path'
       'WriteToJson':
         path: 'path'
-        num_shards: 'num_shards'
       'ReadFromParquet':
         path: 'file_pattern'
       'WriteToParquet':
diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py 
b/sdks/python/apache_beam/yaml/yaml_mapping.py
index 69b38db4173..a6b2b570475 100644
--- a/sdks/python/apache_beam/yaml/yaml_mapping.py
+++ b/sdks/python/apache_beam/yaml/yaml_mapping.py
@@ -16,15 +16,8 @@
 #
 
 """This module defines the basic MapToFields operation."""
-import atexit
-import importlib
 import itertools
-import os
-import queue
 import re
-import sys
-import threading
-import uuid
 from collections import abc
 from collections.abc import Callable
 from collections.abc import Collection
@@ -60,6 +53,14 @@ from apache_beam.yaml.yaml_errors import 
maybe_with_exception_handling
 from apache_beam.yaml.yaml_errors import 
maybe_with_exception_handling_transform_fn
 from apache_beam.yaml.yaml_provider import dicts_to_rows
 
+# Import js2py package if it exists
+try:
+  import js2py
+  from js2py.base import JsObjectWrapper
+except ImportError:
+  js2py = None
+  JsObjectWrapper = object
+
 _str_expression_fields = {
     'AssignTimestamps': 'timestamp',
     'Filter': 'keep',
@@ -177,6 +178,20 @@ def _check_mapping_arguments(
     raise ValueError(f'{transform_name} cannot specify "name" without "path"')
 
 
+# js2py's JsObjectWrapper object has a self-referencing __dict__ property
+# that cannot be pickled without implementing the __getstate__ and
+# __setstate__ methods.
+class _CustomJsObjectWrapper(JsObjectWrapper):
+  def __init__(self, js_obj):
+    super().__init__(js_obj.__dict__['_obj'])
+
+  def __getstate__(self):
+    return self.__dict__.copy()
+
+  def __setstate__(self, state):
+    self.__dict__.update(state)
+
+
 # TODO(yaml) Improve type inferencing for JS UDF's
 def py_value_to_js_dict(py_value):
   if ((isinstance(py_value, tuple) and hasattr(py_value, '_asdict')) or
@@ -190,181 +205,85 @@ def py_value_to_js_dict(py_value):
     return py_value
 
 
-class PythonMonkeyDispatcher:
-  """Dispatcher for executing JavaScript code using pythonmonkey.
-
-  This class manages a worker thread to execute JavaScript, ensuring that
-  pythonmonkey is only imported and used within that thread. It also handles
-  process shutdown carefully to avoid segmentation faults known to occur
-  when pythonmonkey is present during standard Python interpreter finalization.
-  """
-  def __init__(self):
-    self._req_queue = queue.Queue()
-    self._resp_events = {}
-    self._resp_data = {}
-    self._lock = threading.Lock()
-    self._thread = threading.Thread(target=self._worker, daemon=True)
-    self._started = False
-    # Register the stop method to be called on exit.
-    # atexit handlers are executed in LIFO order. By registering at import 
time,
-    # we ensure this handler runs last, allowing other cleanup handlers
-    # (registered later) to execute first.
-    atexit.register(self.stop)
-
-  def start(self):
-    with self._lock:
-      if not self._started:
-        self._thread.start()
-        self._started = True
-
-  def stop(self):
-    # This method is called on process exit.
-    if not self._started:
-      return
-    # Flush standard streams before forced exit to avoid data loss.
-    try:
-      sys.stdout.flush()
-      sys.stderr.flush()
-    except Exception:
-      pass
-    # Force an immediate exit to avoid a segmentation fault that occurs with
-    # pythonmonkey during standard interpreter finalization.
-    # Since this runs as one of the last atexit handlers (due to import-time
-    # registration), most other cleanup should have already completed.
-    os._exit(0)
-
-  def _worker(self):
-    try:
-      import pythonmonkey as pm
-    except ImportError:
-      pm = None
-
-    self._pm = pm
-    self._cache = {}
-
-    while True:
-      req = self._req_queue.get()
-      if req is None:
-        break
-
-      req_id, type_str, payload = req
-      res = None
-      is_err = False
-      try:
-        if self._pm is None:
-          raise ImportError(
-              "PythonMonkey not installed or failed to import in worker 
thread."
-          )
-
-        if type_str == 'exec':
-          source, row = payload
-          if source not in self._cache:
-            self._cache[source] = self._pm.eval(f"({source})")
-          func = self._cache[source]
-          res = func(row)
-      except Exception as e:
-        res = e
-        is_err = True
-
-      with self._lock:
-        if req_id in self._resp_events:
-          self._resp_data[req_id] = (is_err, res)
-          self._resp_events[req_id].set()
-
-  def eval_and_run(self, source, row):
-    if not self._started:
-      self.start()
-
-    req_id = str(uuid.uuid4())
-    event = threading.Event()
-    with self._lock:
-      self._resp_events[req_id] = event
-
-    self._req_queue.put((req_id, 'exec', (source, row)))
-    event.wait()
-
-    with self._lock:
-      is_err, result = self._resp_data.pop(req_id)
-      del self._resp_events[req_id]
-
-    if is_err:
-      raise result
-    return result
-
-
-_pythonmonkey_dispatcher = PythonMonkeyDispatcher()
-
-
-class JavaScriptCallable:
-  def __init__(self, source, name=None):
-    self._source = source
-    self._name = name
-
-  def __call__(self, row):
-    # Check for pythonmonkey availability lazily (on first call)
-    if importlib.util.find_spec("pythonmonkey") is None:
-      raise RuntimeError(
-          "PythonMonkey is not installed. Please install 'apache_beam[yaml]' "
-          "to use JavaScript mapping functions.")
-
-    row_as_dict = py_value_to_js_dict(row)
-    try:
-      # If we have a name, it means we evaluated a file and need to call
-      # a specific function.
-      # Dispatcher expects a self-contained source/expression.
-      if self._name:
-        # Wrap: (function() { <source>; return <name>; })()
-        effective_source = (
-            f"(function() {{ {self._source}; return {self._name}; }})()")
-      else:
-        # Expression/Callable case: Wrap in parens to be safe
-        effective_source = f"({self._source})"
-
-      js_result = _pythonmonkey_dispatcher.eval_and_run(
-          effective_source, row_as_dict)
-
-    except Exception as exn:
-      raise RuntimeError(
-          f"Error evaluating javascript expression: {exn}") from exn
-    return dicts_to_rows(_finalize_js_result(js_result))
-
-
-def _finalize_js_result(obj):
-  """Coerce pythonmonkey objects to native Python objects (specifically
-  strings).
-  """
-  if isinstance(obj, str):
-    return str(obj)
-  if isinstance(obj, list):
-    return [_finalize_js_result(x) for x in obj]
-  if isinstance(obj, dict):
-    return {k: _finalize_js_result(v) for k, v in obj.items()}
-  return obj
-
-
+# TODO(yaml) Consider adding optional language version parameter to support
+#  ECMAScript 5 and 6
 def _expand_javascript_mapping_func(
     original_fields, expression=None, callable=None, path=None, name=None):
 
-  if importlib.util.find_spec("pythonmonkey") is None:
+  # Check for installed js2py package
+  if js2py is None:
     raise ValueError(
-        "PythonMonkey is not installed. Please install 'apache_beam[yaml]' "
-        "to use JavaScript mapping functions.")
+        "Javascript mapping functions are not supported on"
+        " Python 3.12 or later.")
+
+  # import remaining js2py objects
+  from js2py import base
+  from js2py.constructors import jsdate
+  from js2py.internals import simplex
+
+  js_array_type = (
+      base.PyJsArray,
+      base.PyJsArrayBuffer,
+      base.PyJsInt8Array,
+      base.PyJsUint8Array,
+      base.PyJsUint8ClampedArray,
+      base.PyJsInt16Array,
+      base.PyJsUint16Array,
+      base.PyJsInt32Array,
+      base.PyJsUint32Array,
+      base.PyJsFloat32Array,
+      base.PyJsFloat64Array)
+
+  def _js_object_to_py_object(obj):
+    if isinstance(obj, (base.PyJsNumber, base.PyJsString, base.PyJsBoolean)):
+      return base.to_python(obj)
+    elif isinstance(obj, js_array_type):
+      return [_js_object_to_py_object(value) for value in obj.to_list()]
+    elif isinstance(obj, jsdate.PyJsDate):
+      return obj.to_utc_dt()
+    elif isinstance(obj, (base.PyJsNull, base.PyJsUndefined)):
+      return None
+    elif isinstance(obj, base.PyJsError):
+      raise RuntimeError(obj['message'])
+    elif isinstance(obj, base.PyJsObject):
+      return {
+          key: _js_object_to_py_object(value['value'])
+          for (key, value) in obj.own.items()
+      }
+    elif isinstance(obj, base.JsObjectWrapper):
+      return _js_object_to_py_object(obj._obj)
+
+    return obj
 
   if expression:
     source = '\n'.join(['function(__row__) {'] + [
         f'  {name} = __row__.{name}'
         for name in original_fields if name in expression
     ] + ['  return (' + expression + ')'] + ['}'])
-    return JavaScriptCallable(source)
+    js_func = _CustomJsObjectWrapper(js2py.eval_js(source))
 
   elif callable:
-    return JavaScriptCallable(callable)
+    js_func = _CustomJsObjectWrapper(js2py.eval_js(callable))
 
   else:
     if not path.endswith('.js'):
       raise ValueError(f'File "{path}" is not a valid .js file.')
     udf_code = FileSystems.open(path).read().decode()
-    return JavaScriptCallable(udf_code, name=name)
+    js = js2py.EvalJs()
+    js.eval(udf_code)
+    js_func = _CustomJsObjectWrapper(getattr(js, name))
+
+  def js_wrapper(row):
+    row_as_dict = py_value_to_js_dict(row)
+    try:
+      js_result = js_func(row_as_dict)
+    except simplex.JsException as exn:
+      raise RuntimeError(
+          f"Error evaluating javascript expression: "
+          f"{exn.mes['message']}") from exn
+    return dicts_to_rows(_js_object_to_py_object(js_result))
+
+  return js_wrapper
 
 
 def _expand_python_mapping_func(
diff --git a/sdks/python/apache_beam/yaml/yaml_udf_test.py 
b/sdks/python/apache_beam/yaml/yaml_udf_test.py
index e6d0a0af41a..3d664ab9de4 100644
--- a/sdks/python/apache_beam/yaml/yaml_udf_test.py
+++ b/sdks/python/apache_beam/yaml/yaml_udf_test.py
@@ -14,7 +14,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-import importlib
 import logging
 import os
 import shutil
@@ -32,17 +31,11 @@ from apache_beam.yaml.yaml_mapping import 
py_value_to_js_dict
 from apache_beam.yaml.yaml_provider import dicts_to_rows
 from apache_beam.yaml.yaml_transform import YamlTransform
 
-# We use find_spec to check for pythonmonkey availability without importing it.
-# Importing pythonmonkey initializes the engine and binds it to the current
-# thread (MainThread). This causes "too much recursion" errors when the
-# Dispatcher later tries to use it from a background thread.
 try:
-  pm_available = importlib.util.find_spec("pythonmonkey") is not None
+  import js2py
 except ImportError:
-  pm_available = False
-
-if not pm_available:
-  logging.warning('pythonmonkey is not installed; some tests will be skipped.')
+  js2py = None
+  logging.warning('js2py is not installed; some tests will be skipped.')
 
 
 def as_rows():
@@ -70,7 +63,7 @@ class YamlUDFMappingTest(unittest.TestCase):
   def tearDown(self):
     shutil.rmtree(self.tmpdir)
 
-  @unittest.skipIf(not pm_available, 'pythonmonkey not installed.')
+  @unittest.skipIf(js2py is None, 'js2py not installed.')
   def test_map_to_fields_filter_inline_js(self):
     with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
         pickle_library='cloudpickle', yaml_experimental_features=['javascript'
@@ -204,7 +197,7 @@ class YamlUDFMappingTest(unittest.TestCase):
               beam.Row(label='389a', timestamp=2, label_copy="389a"),
           ]))
 
-  @unittest.skipIf(not pm_available, 'pythonmonkey not installed.')
+  @unittest.skipIf(js2py is None, 'js2py not installed.')
   def test_filter_inline_js(self):
     with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
         pickle_library='cloudpickle', yaml_experimental_features=['javascript'
@@ -259,7 +252,7 @@ class YamlUDFMappingTest(unittest.TestCase):
                   row=beam.Row(rank=2, values=[7, 8, 9])),
           ]))
 
-  @unittest.skipIf(not pm_available, 'pythonmonkey not installed.')
+  @unittest.skipIf(js2py is None, 'js2py not installed.')
   def test_filter_expression_js(self):
     with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
         pickle_library='cloudpickle', yaml_experimental_features=['javascript'
@@ -303,7 +296,7 @@ class YamlUDFMappingTest(unittest.TestCase):
                   row=beam.Row(rank=0, values=[1, 2, 3])),
           ]))
 
-  @unittest.skipIf(not pm_available, 'pythonmonkey not installed.')
+  @unittest.skipIf(js2py is None, 'js2py not installed.')
   def test_filter_inline_js_file(self):
     data = '''
     function f(x) {
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 8500830a17b..3b195cdbc08 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -574,9 +574,8 @@ if __name__ == '__main__':
               'docstring-parser>=0.15,<1.0',
               'jinja2>=3.0,<3.2',
               'virtualenv-clone>=0.5,<1.0',
-              # pythonmonkey is used for Javascript mapping support
-              # Please install NPM and Node.js before installing PythonMonkey.
-              'pythonmonkey>=1.3.0',
+              # https://github.com/PiotrDabkowski/Js2Py/issues/317
+              'js2py>=0.74,<1; python_version<"3.12"',
               'jsonschema>=4.0.0,<5.0.0',
           ] + dataframe_dependency,
           # Keep the following dependencies in line with what we test against

Reply via email to