gemini-code-assist[bot] commented on code in PR #38236:
URL: https://github.com/apache/beam/pull/38236#discussion_r3133569457


##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -636,8 +675,17 @@ def _PyJsFilter(
     error_handling: Whether and where to output records that throw errors when
       the above expressions are evaluated.
   """  # pylint: disable=line-too-long
-  keep_fn = _as_callable_for_pcoll(pcoll, keep, "keep", language or 'generic')
-  return pcoll | beam.Filter(keep_fn)
+  if language == 'javascript':
+    if isinstance(keep, str):
+      keep = {'expression': keep}
+    udf_code, function_name = _get_javascript_udf_code(
+        [f.name for f in schema_from_element_type(pcoll.element_type).fields],
+        **keep
+    )
+    return pcoll | beam.ParDo(JsFilterDoFn(udf_code, function_name))

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The use of `beam.ParDo(JsFilterDoFn(...))` directly bypasses the 
`error_handling` configuration. You should wrap this transform with 
`maybe_with_exception_handling_transform_fn` to ensure that errors in 
JavaScript execution can be caught and handled according to the YAML 
specification (e.g., redirected to a dead-letter queue).



##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -205,85 +195,138 @@ def py_value_to_js_dict(py_value):
     return py_value
 
 
-# TODO(yaml) Consider adding optional language version parameter to support
-#  ECMAScript 5 and 6
-def _expand_javascript_mapping_func(
-    original_fields, expression=None, callable=None, path=None, name=None):
+def js_to_py(obj):
+  """Converts mini-racer mapped objects to standard Python types.
+  
+  This is needed because ctx.eval returns objects that implement Mapping
+  and Iterable but are not picklable (like JSMappedObjectImpl and JSArrayImpl),
+  which would fail when Beam tries to serialize rows containing them.
+  We also preserve datetime objects which are correctly produced by ctx.eval
+  for JS Date objects.
+  """
+  if isinstance(obj, datetime.datetime):
+    return obj
+  elif isinstance(obj, Mapping):
+    return {k: js_to_py(v) for k, v in obj.items()}
+  elif not isinstance(obj, (str, bytes)) and isinstance(obj, Iterable):

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The names `Mapping` and `Iterable` are not defined in this scope. Since 
`collections.abc` is imported as `abc` on line 25, you should use `abc.Mapping` 
and `abc.Iterable` to avoid a `NameError` at runtime.
   
   ```suggestion
     elif isinstance(obj, abc.Mapping):
       return {k: js_to_py(v) for k, v in obj.items()}
     elif not isinstance(obj, (str, bytes)) and isinstance(obj, abc.Iterable):
   ```



##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -709,10 +757,12 @@ def _PyJsMapToFields(
   """  # pylint: disable=line-too-long
   input_schema, fields = normalize_fields(
       pcoll, fields, drop or (), append, language=language or 'generic')
+  original_fields = list(input_schema.keys())
+
   if language == 'javascript':
     options.YamlOptions.check_enabled(pcoll.pipeline, 'javascript')
-
-  original_fields = list(input_schema.keys())
+    return pcoll | beam.ParDo(
+        JsMapToFieldsDoFn(fields, original_fields, input_schema))

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   This `ParDo` also ignores the `error_handling` parameter. It should be 
wrapped with `maybe_with_exception_handling_transform_fn` to maintain 
consistency with other YAML transforms and support error redirection for failed 
JS evaluations.



##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -16,8 +16,12 @@
 #
 
 """This module defines the basic MapToFields operation."""
+
+import datetime
 import itertools
 import re
+import threading
+import uuid

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   Several required imports are missing for the new functionality: 
`FileSystems` (from `apache_beam.io.filesystems`) and 
`schema_from_element_type` (from `apache_beam.typehints.schemas`). 
Additionally, `threading` and `uuid` appear to be unused in this file and 
should be removed to keep the code clean.



##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -205,85 +195,138 @@ def py_value_to_js_dict(py_value):
     return py_value
 
 
-# TODO(yaml) Consider adding optional language version parameter to support
-#  ECMAScript 5 and 6
-def _expand_javascript_mapping_func(
-    original_fields, expression=None, callable=None, path=None, name=None):
+def js_to_py(obj):
+  """Converts mini-racer mapped objects to standard Python types.
+  
+  This is needed because ctx.eval returns objects that implement Mapping
+  and Iterable but are not picklable (like JSMappedObjectImpl and JSArrayImpl),
+  which would fail when Beam tries to serialize rows containing them.
+  We also preserve datetime objects which are correctly produced by ctx.eval
+  for JS Date objects.
+  """
+  if isinstance(obj, datetime.datetime):
+    return obj
+  elif isinstance(obj, Mapping):
+    return {k: js_to_py(v) for k, v in obj.items()}
+  elif not isinstance(obj, (str, bytes)) and isinstance(obj, Iterable):
+    return [js_to_py(v) for v in obj]
+  elif isinstance(obj, str):
+    if _JS_DATE_ISO_REGEX.match(obj):
+      try:
+        return datetime.datetime.fromisoformat(obj[:-1] + '+00:00')
+      except ValueError:
+        return obj
+    return obj
+  else:
+    return obj
 
-  # Check for installed js2py package
-  if js2py is None:
-    raise ValueError(
-        "Javascript mapping functions are not supported on"
-        " Python 3.12 or later.")
-
-  # import remaining js2py objects
-  from js2py import base
-  from js2py.constructors import jsdate
-  from js2py.internals import simplex
-
-  js_array_type = (
-      base.PyJsArray,
-      base.PyJsArrayBuffer,
-      base.PyJsInt8Array,
-      base.PyJsUint8Array,
-      base.PyJsUint8ClampedArray,
-      base.PyJsInt16Array,
-      base.PyJsUint16Array,
-      base.PyJsInt32Array,
-      base.PyJsUint32Array,
-      base.PyJsFloat32Array,
-      base.PyJsFloat64Array)
-
-  def _js_object_to_py_object(obj):
-    if isinstance(obj, (base.PyJsNumber, base.PyJsString, base.PyJsBoolean)):
-      return base.to_python(obj)
-    elif isinstance(obj, js_array_type):
-      return [_js_object_to_py_object(value) for value in obj.to_list()]
-    elif isinstance(obj, jsdate.PyJsDate):
-      return obj.to_utc_dt()
-    elif isinstance(obj, (base.PyJsNull, base.PyJsUndefined)):
-      return None
-    elif isinstance(obj, base.PyJsError):
-      raise RuntimeError(obj['message'])
-    elif isinstance(obj, base.PyJsObject):
-      return {
-          key: _js_object_to_py_object(value['value'])
-          for (key, value) in obj.own.items()
-      }
-    elif isinstance(obj, base.JsObjectWrapper):
-      return _js_object_to_py_object(obj._obj)
 
-    return obj
+class JsFilterDoFn(beam.DoFn):
+  def __init__(self, udf_code, function_name):
+    self.udf_code = udf_code
+    self.function_name = function_name
+    self.ctx = None
+
+  def setup(self):
+    self.ctx = MiniRacer()
+    self.ctx.eval(self.udf_code)
+
+  def process(self, element):
+    row_as_dict = py_value_to_js_dict(element)
+    result = self.ctx.call(self.function_name, row_as_dict)
+    result = js_to_py(result)
+    if result:
+      yield element
+
+
+class JsMapToFieldsDoFn(beam.DoFn):
+  def __init__(self, fields, original_fields, input_schema):
+    self.fields = fields
+    self.original_fields = original_fields
+    self.input_schema = input_schema
+    self.ctx = None
+    self.field_funcs = {}
+    self.passthrough_fields = []
+
+  def setup(self):
+    self.ctx = MiniRacer()
+    script = []
+    for name, expr in self.fields.items():
+      if isinstance(expr, str) and expr in self.input_schema:
+        self.passthrough_fields.append((name, expr))
+        continue
+
+      if isinstance(expr, str):
+        expr = {'expression': expr}
+
+      if 'expression' in expr:
+        e = expr['expression']
+        code = f"var func_{name} = (__row__) => {{ " + " ".join([
+            f"const {n} = __row__.{n};" for n in self.original_fields if n in e
+        ]) + f" return ({e}); }}"
+        script.append(code)
+        self.field_funcs[name] = f"func_{name}"
+      elif 'callable' in expr:
+        code = f"var func_{name} = {expr['callable']}"
+        script.append(code)
+        self.field_funcs[name] = f"func_{name}"
+      elif 'path' in expr and 'name' in expr:
+        path = expr['path']
+        func_name = expr['name']
+        udf_code = FileSystems.open(path).read().decode()
+        script.append(udf_code)
+        self.field_funcs[name] = func_name

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Reading the UDF file inside `JsMapToFieldsDoFn.setup` is inefficient as it 
happens on every worker/bundle initialization. It's also inconsistent with 
`JsFilterDoFn` where the code is read at expansion time. It is recommended to 
read the file content during pipeline construction (expansion) and pass the 
code string to the DoFn constructor.



##########
sdks/python/setup.py:
##########
@@ -621,8 +621,7 @@ def get_portability_package_data():
               'docstring-parser>=0.15,<1.0',
               'jinja2>=3.0,<3.2',
               'virtualenv-clone>=0.5,<1.0',
-              # https://github.com/PiotrDabkowski/Js2Py/issues/317
-              'js2py>=0.74,<1; python_version<"3.12"',
+              'mini-racer',

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The correct package name for the V8 bridge is `py-mini-racer`. The 
`mini-racer` package is often an older or different version. Given the import 
`from py_mini_racer import MiniRacer` used in the SDK, `py-mini-racer` should 
be specified as the dependency.
   
   ```suggestion
                 'py-mini-racer',
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to