shunping opened a new pull request, #35862:
URL: https://github.com/apache/beam/pull/35862
We received an internal report (internal bug id: 430560535) that cogbk not
honoring custom coders.
Below is the code to reproduce.
```Python
"""Test Beam's use of coders for keys in CoGroupByKey."""
import apache_beam as beam
from apache_beam.options import pipeline_options
class _Unpicklable:
def __init__(self, value):
self.value = value
def __getstate__(self):
raise NotImplementedError()
def __setstate__(self, state):
raise NotImplementedError()
def __repr__(self):
return f"Unpicklable({self.value})"
class _UnpicklableCoder(beam.coders.Coder):
"""."""
def encode(self, value):
return str(value.value).encode()
def decode(self, encoded):
return _Unpicklable(int(encoded.decode()))
def to_type_hint(self):
return _Unpicklable
def is_deterministic(self):
return True
beam.coders.registry.register_coder(_Unpicklable, _UnpicklableCoder)
def pipeline_fn(root):
values = [_Unpicklable(i) for i in range(5)]
xs = root | beam.Create(values) | beam.WithKeys(lambda x: x)
return (
{'x': xs}
| beam.CoGroupByKey()
| beam.FlatMapTuple(lambda k, tagged: (k.value, tagged['x'][0].value *
2))
| beam.LogElements()
)
def main():
options = pipeline_options.PipelineOptions(
runner='DirectRunner', direct_num_workers=1,
type_check_additional='all'
)
with beam.Pipeline(options=options) as pipeline:
_ = pipeline_fn(pipeline)
if __name__ == '__main__':
main()
```
Running this code will result in a "NotImplementedError", because pickled
coder rather than the registered coder is used.
My current PR can fix this problem by propagating type hints correctly.
Related PR: #33932
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]