gemini-code-assist[bot] commented on code in PR #38236:
URL: https://github.com/apache/beam/pull/38236#discussion_r3184196023


##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -16,9 +16,10 @@
 #
 
 """This module defines the basic MapToFields operation."""
+
+import datetime
 import itertools
 import re
-from collections import abc
 from collections.abc import Callable
 from collections.abc import Collection
 from collections.abc import Iterable

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The `Mapping` type is used in the new `js_to_py` function but is not 
imported from `collections.abc`.
   
   ```suggestion
   from collections.abc import Callable
   from collections.abc import Collection
   from collections.abc import Iterable
   from collections.abc import Mapping
   ```



##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -636,8 +689,17 @@ def _PyJsFilter(
     error_handling: Whether and where to output records that throw errors when
       the above expressions are evaluated.
   """  # pylint: disable=line-too-long
-  keep_fn = _as_callable_for_pcoll(pcoll, keep, "keep", language or 'generic')
-  return pcoll | beam.Filter(keep_fn)
+  if language == 'javascript':
+    if isinstance(keep, str):
+      keep = {'expression': keep}
+    udf_code, function_name = _get_javascript_udf_code(
+        [f.name for f in schema_from_element_type(pcoll.element_type).fields],
+        **keep
+    )
+    return pcoll | beam.ParDo(JsFilterDoFn(udf_code, function_name))
+  else:
+    keep_fn = _as_callable_for_pcoll(pcoll, keep, "keep", language or 
'generic')
+    return pcoll | beam.Filter(keep_fn)

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The `error_handling` configuration is ignored when using JavaScript. This is 
a regression from the previous implementation where errors during UDF execution 
could be captured and redirected. Consider wrapping the `ParDo` with error 
handling logic.



##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -709,10 +771,12 @@ def _PyJsMapToFields(
   """  # pylint: disable=line-too-long
   input_schema, fields = normalize_fields(
       pcoll, fields, drop or (), append, language=language or 'generic')
+  original_fields = list(input_schema.keys())
+
   if language == 'javascript':
     options.YamlOptions.check_enabled(pcoll.pipeline, 'javascript')
-
-  original_fields = list(input_schema.keys())
+    return pcoll | beam.ParDo(
+        JsMapToFieldsDoFn(fields, original_fields, input_schema))

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The `error_handling` parameter is ignored for JavaScript mappings. 
Additionally, it is recommended to check if `MiniRacer` is available at 
construction time to provide a clear error message if the dependency is missing.
   
   ```python
     if language == 'javascript':
       options.YamlOptions.check_enabled(pcoll.pipeline, 'javascript')
       if MiniRacer is None:
         raise ValueError(
             "JavaScript mapping functions require the 'py-mini-racer' package 
to be installed."
         )
       return pcoll | beam.ParDo(
           JsMapToFieldsDoFn(fields, original_fields, input_schema))
   ```



##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -53,13 +54,13 @@
 from apache_beam.yaml.yaml_errors import 
maybe_with_exception_handling_transform_fn
 from apache_beam.yaml.yaml_provider import dicts_to_rows

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The `schema_from_element_type` function is used in `_PyJsFilter` but is not 
imported. You should add `from apache_beam.typehints.schemas import 
schema_from_element_type` to the imports.
   
   ```suggestion
   from apache_beam.typehints.schemas import schema_from_element_type
   from apache_beam.yaml.yaml_provider import dicts_to_rows
   ```



##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -178,112 +179,168 @@ def _check_mapping_arguments(
     raise ValueError(f'{transform_name} cannot specify "name" without "path"')
 
 
-# js2py's JsObjectWrapper object has a self-referencing __dict__ property
-# that cannot be pickled without implementing the __getstate__ and
-# __setstate__ methods.
-class _CustomJsObjectWrapper(JsObjectWrapper):
-  def __init__(self, js_obj):
-    super().__init__(js_obj.__dict__['_obj'])
-
-  def __getstate__(self):
-    return self.__dict__.copy()
-
-  def __setstate__(self, state):
-    self.__dict__.update(state)
-
-
 # TODO(yaml) Improve type inferencing for JS UDF's
 def py_value_to_js_dict(py_value):
   if ((isinstance(py_value, tuple) and hasattr(py_value, '_asdict')) or
       isinstance(py_value, beam.Row)):
     py_value = py_value._asdict()
   if isinstance(py_value, dict):
     return {key: py_value_to_js_dict(value) for key, value in py_value.items()}
-  elif not isinstance(py_value, str) and isinstance(py_value, abc.Iterable):
+  elif not isinstance(py_value, str) and isinstance(py_value, Iterable):
     return [py_value_to_js_dict(value) for value in list(py_value)]
   else:
     return py_value
 
 
-# TODO(yaml) Consider adding optional language version parameter to support
-#  ECMAScript 5 and 6
-def _expand_javascript_mapping_func(
-    original_fields, expression=None, callable=None, path=None, name=None):
-
-  # Check for installed js2py package
-  if js2py is None:
-    raise ValueError(
-        "Javascript mapping functions are not supported on"
-        " Python 3.12 or later.")
-
-  # import remaining js2py objects
-  from js2py import base
-  from js2py.constructors import jsdate
-  from js2py.internals import simplex
-
-  js_array_type = (
-      base.PyJsArray,
-      base.PyJsArrayBuffer,
-      base.PyJsInt8Array,
-      base.PyJsUint8Array,
-      base.PyJsUint8ClampedArray,
-      base.PyJsInt16Array,
-      base.PyJsUint16Array,
-      base.PyJsInt32Array,
-      base.PyJsUint32Array,
-      base.PyJsFloat32Array,
-      base.PyJsFloat64Array)
-
-  def _js_object_to_py_object(obj):
-    if isinstance(obj, (base.PyJsNumber, base.PyJsString, base.PyJsBoolean)):
-      return base.to_python(obj)
-    elif isinstance(obj, js_array_type):
-      return [_js_object_to_py_object(value) for value in obj.to_list()]
-    elif isinstance(obj, jsdate.PyJsDate):
-      return obj.to_utc_dt()
-    elif isinstance(obj, (base.PyJsNull, base.PyJsUndefined)):
-      return None
-    elif isinstance(obj, base.PyJsError):
-      raise RuntimeError(obj['message'])
-    elif isinstance(obj, base.PyJsObject):
-      return {
-          key: _js_object_to_py_object(value['value'])
-          for (key, value) in obj.own.items()
-      }
-    elif isinstance(obj, base.JsObjectWrapper):
-      return _js_object_to_py_object(obj._obj)
-
+def js_to_py(obj):
+  """Converts mini-racer mapped objects to standard Python types.
+  
+  This is needed because ctx.eval returns objects that implement Mapping
+  and Iterable but are not picklable (like JSMappedObjectImpl and JSArrayImpl),
+  which would fail when Beam tries to serialize rows containing them.
+  We also preserve datetime objects which are correctly produced by ctx.eval
+  for JS Date objects.
+  """
+  if isinstance(obj, datetime.datetime):
+    return obj
+  elif isinstance(obj, Mapping):
+    return {k: js_to_py(v) for k, v in obj.items()}
+  elif not isinstance(obj, (str, bytes)) and isinstance(obj, Iterable):
+    return [js_to_py(v) for v in obj]
+  elif isinstance(obj, str):
+    if _JS_DATE_ISO_REGEX.match(obj):
+      try:
+        return datetime.datetime.fromisoformat(obj[:-1] + '+00:00')
+      except ValueError:
+        return obj
+    return obj
+  else:
     return obj
 
-  if expression:
-    source = '\n'.join(['function(__row__) {'] + [
-        f'  {name} = __row__.{name}'
-        for name in original_fields if name in expression
-    ] + ['  return (' + expression + ')'] + ['}'])
-    js_func = _CustomJsObjectWrapper(js2py.eval_js(source))
 
-  elif callable:
-    js_func = _CustomJsObjectWrapper(js2py.eval_js(callable))
+class JsFilterDoFn(beam.DoFn):
+  def __init__(self, udf_code, function_name):
+    self.udf_code = udf_code
+    self.function_name = function_name
+    self.ctx = None
+
+  def setup(self):
+    self.ctx = MiniRacer()
+    self.ctx.eval(self.udf_code)
+
+  def process(self, element):
+    row_as_dict = py_value_to_js_dict(element)
+    result = self.ctx.call(self.function_name, row_as_dict)
+    result = js_to_py(result)
+    if result:
+      yield element
+
+
+class JsMapToFieldsDoFn(beam.DoFn):
+  def __init__(self, fields, original_fields, input_schema):
+    self.ctx = None
+    self.field_funcs = {}
+    self.passthrough_fields = []
+
+    script = []
+    for i, (name, expr) in enumerate(fields.items()):
+      if isinstance(expr, str) and expr in input_schema:
+        self.passthrough_fields.append((name, expr))
+        continue
+
+      if isinstance(expr, str):
+        expr = {'expression': expr}
+
+      # We use numeric indexing (func_{i}) instead of reusing the output field
+      # name to prevent syntax errors if output names contain spaces or 
hyphens.
+      # We also use bracket notation for robustness against input field names
+      # that aren't compliant dot-access identifiers.
+      if 'expression' in expr:
+        e = expr['expression']
+        js_identifier_pattern = re.compile(r'^[a-zA-Z_$][a-zA-Z0-9_$]*$')

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Compiling the regular expression `js_identifier_pattern` inside the loop is 
inefficient. It should be defined as a module-level constant to avoid redundant 
compilation for every field.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to