robertwb commented on code in PR #28462:
URL: https://github.com/apache/beam/pull/28462#discussion_r1332273256
##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -190,189 +253,169 @@ def with_exception_handling(self, **kwargs):
return self
-# TODO(yaml): Should Filter and Explode be distinct operations from Project?
-# We'll want these per-language.
@beam.ptransform.ptransform_fn
-def _PythonProjectionTransform(
- pcoll,
- *,
- fields,
- transform_name,
- language,
- keep=None,
- explode=(),
- cross_product=True,
- error_handling=None):
- original_fields = [
- name for (name, _) in named_fields_from_element_type(pcoll.element_type)
- ]
+@maybe_with_exception_handling_transform_fn
+def _PyJsFilter(
+ pcoll, keep: Union[str, Dict[str, str]], language: Optional[str] = None):
- if error_handling is None:
- error_handling_args = None
+ input_schema = dict(named_fields_from_element_type(pcoll.element_type))
+ if isinstance(keep, str) and keep in input_schema:
+ keep_fn = lambda row: getattr(row, keep)
else:
- error_handling_args = {
- 'dead_letter_tag' if k == 'output' else k: v
- for (k, v) in error_handling.items()
- }
+ keep_fn = _as_callable(list(input_schema.keys()), keep, "keep", language)
+ return pcoll | beam.Filter(keep_fn)
Review Comment:
Filter returns the inputs as outputs. From the users point of view they
don't care if they're Row objects or NamedTuple objects (though the equality
semantics are a bit strict).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]