derrickaw commented on code in PR #38130:
URL: https://github.com/apache/beam/pull/38130#discussion_r3093381269
##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -210,78 +225,109 @@ def py_value_to_js_dict(py_value):
def _expand_javascript_mapping_func(
original_fields, expression=None, callable=None, path=None, name=None):
- # Check for installed js2py package
- if js2py is None:
+ # Check for installed quickjs package
+ if quickjs is None:
raise ValueError(
- "Javascript mapping functions are not supported on"
- " Python 3.12 or later.")
-
- # import remaining js2py objects
- from js2py import base
- from js2py.constructors import jsdate
- from js2py.internals import simplex
-
- js_array_type = (
- base.PyJsArray,
- base.PyJsArrayBuffer,
- base.PyJsInt8Array,
- base.PyJsUint8Array,
- base.PyJsUint8ClampedArray,
- base.PyJsInt16Array,
- base.PyJsUint16Array,
- base.PyJsInt32Array,
- base.PyJsUint32Array,
- base.PyJsFloat32Array,
- base.PyJsFloat64Array)
-
- def _js_object_to_py_object(obj):
- if isinstance(obj, (base.PyJsNumber, base.PyJsString, base.PyJsBoolean)):
- return base.to_python(obj)
- elif isinstance(obj, js_array_type):
- return [_js_object_to_py_object(value) for value in obj.to_list()]
- elif isinstance(obj, jsdate.PyJsDate):
- return obj.to_utc_dt()
- elif isinstance(obj, (base.PyJsNull, base.PyJsUndefined)):
- return None
- elif isinstance(obj, base.PyJsError):
- raise RuntimeError(obj['message'])
- elif isinstance(obj, base.PyJsObject):
- return {
- key: _js_object_to_py_object(value['value'])
- for (key, value) in obj.own.items()
- }
- elif isinstance(obj, base.JsObjectWrapper):
- return _js_object_to_py_object(obj._obj)
-
- return obj
+ "Javascript mapping functions require the 'quickjs' package.")
+
+ def make_bridge_source(func_name, call_expr):
+ # The bridge function facilitates high-performance data transfer from
Python
+ # to QuickJS by reconstructing the row object in JS.
+ # To minimize JSON overhead, primitives are passed directly, while complex
+ # types (lists/dicts) are passed as JSON strings and parsed in JS.
+ # The 'flags' argument indicates which values need parsing.
+ keys_json = json.dumps(list(original_fields))
+ return (
+ f"function {func_name}(serialized_flags, ...values) {{ "
+ f" const keys = {keys_json}; "
+ f" const flags = serialized_flags.split(','); "
+ f" const row = {{}}; "
+ f" for (let i = 0; i < keys.length; i++) {{ "
+ f" let val = values[i]; "
+ f" if (flags[i] === '1') val = JSON.parse(val); "
+ f" row[keys[i]] = val; "
+ f" }} "
+ f" const result = {call_expr}; "
+ f" if (result instanceof Date) "
+ f"return {{__type__: 'date', value: result.toISOString()}}; "
+ f" return result; "
+ f"}}")
if expression:
- source = '\n'.join(['function(__row__) {'] + [
- f' {name} = __row__.{name}'
- for name in original_fields if name in expression
- ] + [' return (' + expression + ')'] + ['}'])
- js_func = _CustomJsObjectWrapper(js2py.eval_js(source))
+ args = [
+ name for name in original_fields
+ if name.isidentifier() and name in expression
+ ]
+ parses = []
+ for i, arg in enumerate(args):
+ parses.append(f" if (flags[{i}] === '1') {arg} = JSON.parse({arg});")
+
+ source = f"""
+function fn(serialized_flags, {", ".join(args)}) {{
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]