claudevdm commented on code in PR #35288: URL: https://github.com/apache/beam/pull/35288#discussion_r2154879175
########## sdks/python/apache_beam/yaml/json_utils.py: ########## @@ -184,14 +191,25 @@ def json_to_row(beam_type: schema_pb2.FieldType) -> Callable[[Any], Any]: value_converter = json_to_row(beam_type.map_type.value_type) return lambda value: {k: value_converter(v) for (k, v) in value.items()} elif type_info == "row_type": + fields = {field.name: field for field in beam_type.row_type.schema.fields} converters = { - field.name: json_to_row(field.type) - for field in beam_type.row_type.schema.fields + name: json_to_row(field.type) + for name, field in fields.items() } - return lambda value: beam.Row( - ** - {name: convert(value[name]) - for (name, convert) in converters.items()}) + + def convert_row(value): + kwargs = {} + for name, convert in converters.items(): + field = fields[name] Review Comment: The test is failing because we are trying to pickle the actual message descriptor in the closure. I tested this which works. ``` elif type_info == "row_type": field_nullable_status = { field.name: field.type.nullable for field in beam_type.row_type.schema.fields } converters = { field.name: json_to_row(field.type) for field in beam_type.row_type.schema.fields } def convert_row(value): kwargs = {} for name, convert in converters.items(): if name in value: kwargs[name] = convert(value[name]) elif field_nullable_status[name]: kwargs[name] = convert(None) else: raise KeyError(f"Missing required field: {name}") return beam.Row(**kwargs) return convert_row ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org