gemini-code-assist[bot] commented on code in PR #38473:
URL: https://github.com/apache/beam/pull/38473#discussion_r3292003035
##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -199,6 +216,15 @@ def py_value_to_js_dict(py_value):
py_value = py_value._asdict()
if isinstance(py_value, dict):
return {key: py_value_to_js_dict(value) for key, value in py_value.items()}
+ elif isinstance(py_value, bytes):
+ return py_value.decode('utf-8', errors='replace')
Review Comment:

Decoding `bytes` to `utf-8` with `errors='replace'` can lead to data
corruption if the input contains arbitrary binary data (e.g., images,
serialized protos). Since `quickjs-ng` supports mapping Python `bytes` to
JavaScript `ArrayBuffer`, it is safer to pass the bytes directly unless a
string representation is explicitly required for the JS logic.
```suggestion
elif isinstance(py_value, bytes):
return py_value
```
##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -178,18 +180,33 @@ def _check_mapping_arguments(
raise ValueError(f'{transform_name} cannot specify "name" without "path"')
-# js2py's JsObjectWrapper object has a self-referencing __dict__ property
-# that cannot be pickled without implementing the __getstate__ and
-# __setstate__ methods.
-class _CustomJsObjectWrapper(JsObjectWrapper):
- def __init__(self, js_obj):
- super().__init__(js_obj.__dict__['_obj'])
+_THREAD_LOCAL_JS_CACHE = threading.local()
- def __getstate__(self):
- return self.__dict__.copy()
- def __setstate__(self, state):
- self.__dict__.update(state)
+class _JsFunctionWrapper:
+ def __init__(self, source_code, entrypoint_name):
+ self.source_code = source_code
+ self.entrypoint_name = entrypoint_name
+
+ def _get_fn(self):
+ cache = _THREAD_LOCAL_JS_CACHE
+ if not hasattr(cache, 'functions'):
+ cache.functions = {}
+
+ cache_key = (self.source_code, self.entrypoint_name)
+ if cache_key not in cache.functions:
+ cache.functions[cache_key] = quickjs.Function(
+ self.entrypoint_name, self.source_code)
Review Comment:

The `quickjs.Function` API is not part of the standard `quickjs-ng` package
(which is specified in `setup.py`). In `quickjs-ng`, you typically create a
`Context` and then use `context.eval()` or `context.get()` to retrieve
functions. Using an unsupported API will result in an `AttributeError` at
runtime.
```suggestion
context = quickjs.Context()
context.eval(self.source_code)
cache.functions[cache_key] = context.get(self.entrypoint_name)
```
##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -210,80 +236,53 @@ def py_value_to_js_dict(py_value):
def _expand_javascript_mapping_func(
original_fields, expression=None, callable=None, path=None, name=None):
- # Check for installed js2py package
- if js2py is None:
+ if quickjs is None:
raise ValueError(
- "Javascript mapping functions are not supported on"
- " Python 3.12 or later.")
-
- # import remaining js2py objects
- from js2py import base
- from js2py.constructors import jsdate
- from js2py.internals import simplex
-
- js_array_type = (
- base.PyJsArray,
- base.PyJsArrayBuffer,
- base.PyJsInt8Array,
- base.PyJsUint8Array,
- base.PyJsUint8ClampedArray,
- base.PyJsInt16Array,
- base.PyJsUint16Array,
- base.PyJsInt32Array,
- base.PyJsUint32Array,
- base.PyJsFloat32Array,
- base.PyJsFloat64Array)
-
- def _js_object_to_py_object(obj):
- if isinstance(obj, (base.PyJsNumber, base.PyJsString, base.PyJsBoolean)):
- return base.to_python(obj)
- elif isinstance(obj, js_array_type):
- return [_js_object_to_py_object(value) for value in obj.to_list()]
- elif isinstance(obj, jsdate.PyJsDate):
- return obj.to_utc_dt()
- elif isinstance(obj, (base.PyJsNull, base.PyJsUndefined)):
- return None
- elif isinstance(obj, base.PyJsError):
- raise RuntimeError(obj['message'])
- elif isinstance(obj, base.PyJsObject):
- return {
- key: _js_object_to_py_object(value['value'])
- for (key, value) in obj.own.items()
- }
- elif isinstance(obj, base.JsObjectWrapper):
- return _js_object_to_py_object(obj._obj)
-
- return obj
+ "Javascript mapping functions are not supported because the "
+ "quickjs-ng library is not installed.")
if expression:
- source = '\n'.join(['function(__row__) {'] + [
- f' {name} = __row__.{name}'
- for name in original_fields if name in expression
- ] + [' return (' + expression + ')'] + ['}'])
- js_func = _CustomJsObjectWrapper(js2py.eval_js(source))
+ source_code = f"""
+ function udf(__row__) {{
+ with (__row__) {{
+ return ({expression});
+ }}
+ }}
+ """
+ user_entrypoint = 'udf'
elif callable:
- js_func = _CustomJsObjectWrapper(js2py.eval_js(callable))
+ source_code = f"var __udf__ = ({callable});"
+ user_entrypoint = '__udf__'
else:
if not path.endswith('.js'):
raise ValueError(f'File "{path}" is not a valid .js file.')
udf_code = FileSystems.open(path).read().decode()
- js = js2py.EvalJs()
- js.eval(udf_code)
- js_func = _CustomJsObjectWrapper(getattr(js, name))
-
- def js_wrapper(row):
- row_as_dict = py_value_to_js_dict(row)
- try:
- js_result = js_func(row_as_dict)
- except simplex.JsException as exn:
- raise RuntimeError(
- f"Error evaluating javascript expression: "
- f"{exn.mes['message']}") from exn
- return dicts_to_rows(_js_object_to_py_object(js_result))
+ source_code = udf_code
+ user_entrypoint = name
+
+ source_code += f"""
Review Comment:

When concatenating the UDF code with the wrapper logic, there is a risk of
syntax errors if the original code ends with a comment or lacks a trailing
newline/semicolon. Adding a newline ensures the appended functions are parsed
correctly.
```suggestion
source_code += "\n" + f"""
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]