This is an automated email from the ASF dual-hosted git repository. bhulette pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 2dc2ad455d1 [BEAM-14281] add as_deterministic_coder to nullable coder (#17322) 2dc2ad455d1 is described below commit 2dc2ad455d1a5a6877da851e05ad90d67c084871 Author: johnjcasey <95318300+johnjca...@users.noreply.github.com> AuthorDate: Fri Apr 8 16:46:58 2022 -0400 [BEAM-14281] add as_deterministic_coder to nullable coder (#17322) * [BEAM-14281] add as_deterministic_coder to nullable coder * Update coders_test.py Co-authored-by: Robert Bradshaw <rober...@gmail.com> --- sdks/python/apache_beam/coders/coders.py | 8 ++++++++ sdks/python/apache_beam/coders/coders_test.py | 16 ++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index fce397df626..19463443386 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -627,6 +627,14 @@ class NullableCoder(FastCoder): # type: () -> bool return self._value_coder.is_deterministic() + def as_deterministic_coder(self, step_label, error_message=None): + if self.is_deterministic(): + return self + else: + deterministic_value_coder = self._value_coder.as_deterministic_coder( + step_label, error_message) + return NullableCoder(deterministic_value_coder) + def __eq__(self, other): return ( type(self) == type(other) and self._value_coder == other._value_coder) diff --git a/sdks/python/apache_beam/coders/coders_test.py b/sdks/python/apache_beam/coders/coders_test.py index 49ad33202f0..0a30a320e90 100644 --- a/sdks/python/apache_beam/coders/coders_test.py +++ b/sdks/python/apache_beam/coders/coders_test.py @@ -21,7 +21,9 @@ import logging import unittest import proto +import pytest +from apache_beam import typehints from apache_beam.coders import proto2_coder_test_messages_pb2 as test_message from apache_beam.coders import coders from apache_beam.coders.avro_record import AvroRecord @@ -220,6 +222,20 @@ class FallbackCoderTest(unittest.TestCase): self.assertEqual(DummyClass(), coder.decode(coder.encode(DummyClass()))) +class NullableCoderTest(unittest.TestCase): + def test_determinism(self): + deterministic = coders_registry.get_coder(typehints.Optional[int]) + deterministic.as_deterministic_coder('label') + + complex_deterministic = coders_registry.get_coder( + typehints.Optional[DummyClass]) + complex_deterministic.as_deterministic_coder('label') + + nondeterministic = coders.NullableCoder(coders.Base64PickleCoder()) + with pytest.raises(ValueError): + nondeterministic.as_deterministic_coder('label') + + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) unittest.main()